import asyncio
import random
import icechunk
import zarr
N = 15
async def write_to_store(array, x, y, barrier):
await barrier.wait()
await asyncio.sleep(random.uniform(0, 0.5))
array[x, y] = x * y
async def read_store(array, x, y, barrier):
await barrier.wait()
while True:
value = array[x, y]
if value == x * y:
break
await asyncio.sleep(random.uniform(0, 0.1))
async def list_store(store, barrier):
expected = set(
["zarr.json", "array/zarr.json"]
+ [f"array/c/{x}/{y}" for x in range(N) for y in range(N)]
)
await barrier.wait()
while True:
current = set([k async for k in store.list_prefix("")])
if current == expected:
break
current = None
await asyncio.sleep(0.1)
async def test_concurrency():
store = await icechunk.IcechunkStore.open(
mode="w",
storage=icechunk.StorageConfig.memory(prefix="concurrency"),
)
group = zarr.group(store=store, overwrite=True)
array = group.create_array(
"array", shape=(N, N), chunk_shape=(1, 1), dtype="f8", fill_value=1e23
)
barrier = asyncio.Barrier(2 * N * N + 1)
async with asyncio.TaskGroup() as tg:
_task1 = tg.create_task(list_store(store, barrier), name="listing")
for x in range(N):
for y in range(N):
_write_task = tg.create_task(
read_store(array, x, y, barrier), name=f"read {x},{y}"
)
for x in range(N):
for y in range(N):
_write_task = tg.create_task(
write_to_store(array, x, y, barrier), name=f"write {x},{y}"
)
_res = await store.commit("commit")
array = group["array"]
assert isinstance(array, zarr.Array)
for x in range(N):
for y in range(N):
assert array[x, y] == x * y
print("done")