icechunk-python 0.1.0-alpha.1

Transactional storage engine for Zarr designed for use on cloud object storage
Documentation
import asyncio
import random

import icechunk
import zarr

N = 15


async def write_to_store(array, x, y, barrier):
    await barrier.wait()
    await asyncio.sleep(random.uniform(0, 0.5))
    array[x, y] = x * y
    # await asyncio.sleep(0)


async def read_store(array, x, y, barrier):
    await barrier.wait()
    while True:
        # print(f"reading {x},{y}")
        value = array[x, y]
        if value == x * y:
            break
        await asyncio.sleep(random.uniform(0, 0.1))


async def list_store(store, barrier):
    expected = set(
        ["zarr.json", "array/zarr.json"]
        + [f"array/c/{x}/{y}" for x in range(N) for y in range(N)]
    )
    await barrier.wait()
    while True:
        current = set([k async for k in store.list_prefix("")])
        if current == expected:
            break
        current = None
        await asyncio.sleep(0.1)


async def test_concurrency():
    store = await icechunk.IcechunkStore.open(
        mode="w",
        storage=icechunk.StorageConfig.memory(prefix="concurrency"),
    )

    group = zarr.group(store=store, overwrite=True)
    array = group.create_array(
        "array", shape=(N, N), chunk_shape=(1, 1), dtype="f8", fill_value=1e23
    )

    barrier = asyncio.Barrier(2 * N * N + 1)

    async with asyncio.TaskGroup() as tg:
        _task1 = tg.create_task(list_store(store, barrier), name="listing")

        for x in range(N):
            for y in range(N):
                _write_task = tg.create_task(
                    read_store(array, x, y, barrier), name=f"read {x},{y}"
                )

        for x in range(N):
            for y in range(N):
                _write_task = tg.create_task(
                    write_to_store(array, x, y, barrier), name=f"write {x},{y}"
                )

    _res = await store.commit("commit")

    array = group["array"]
    assert isinstance(array, zarr.Array)

    for x in range(N):
        for y in range(N):
            assert array[x, y] == x * y

    # FIXME: add assertions
    print("done")