from __future__ import annotations
import typing
if typing.TYPE_CHECKING:
from typing_extensions import Unpack
from . import _spec
async def async_freeze(
datatype: str | typing.Sequence[str],
**kwargs: Unpack[_spec.CryoCliArgs],
) -> None:
from . import _cryo_rust from . import _args
if isinstance(datatype, str):
datatypes = [datatype]
elif isinstance(datatype, list):
datatypes = datatype
else:
raise Exception('invalid format for datatype(s)')
cli_args = _args.parse_cli_args(**kwargs)
return await _cryo_rust._freeze(datatypes, **cli_args)
def freeze(
datatype: str | typing.Sequence[str],
**kwargs: Unpack[_spec.CryoCliArgs],
) -> None:
import asyncio
coroutine = async_freeze(datatype, **kwargs)
try:
import concurrent.futures
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(loop.run_until_complete, coroutine) return future.result() except RuntimeError:
return asyncio.run(coroutine)