from __future__ import annotations
import typing
if typing.TYPE_CHECKING:
from typing_extensions import Unpack
from typing import Any
import pandas as pd
import polars as pl
from . import _spec
ListOfDicts = list[dict[str, Any]]
DictOfLists = dict[str, list[Any]]
async def async_collect(
datatype: _spec.Datatype,
output_format: _spec.PythonOutput = 'polars',
**kwargs: Unpack[_spec.CryoCliArgs],
) -> pl.DataFrame | pd.DataFrame | ListOfDicts | DictOfLists:
from . import _args
from . import _cryo_rust
cli_args = _args.parse_cli_args(**kwargs)
cli_args['chunk_size'] = 20_000_000
result: pl.DataFrame = await _cryo_rust._collect(datatype, **cli_args)
if output_format == 'polars':
return result
elif output_format == 'pandas':
return result.to_pandas()
elif output_format == 'list':
return result.to_dicts()
elif output_format == 'dict':
return result.to_dict(as_series=False)
else:
raise Exception('unknown output format')
def collect(
datatype: _spec.Datatype,
output_format: _spec.PythonOutput = 'polars',
**kwargs: Unpack[_spec.CryoCliArgs],
) -> pl.DataFrame | pd.DataFrame | ListOfDicts | DictOfLists:
import asyncio
coroutine = async_collect(datatype, output_format=output_format, **kwargs)
try:
import concurrent.futures
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(loop.run_until_complete, coroutine) return future.result() except RuntimeError:
return asyncio.run(coroutine)