from __future__ import annotations
import asyncio
from typing import Any
async def _to_thread(func: Any, /, *args: Any, **kwargs: Any) -> Any:
if hasattr(asyncio, "to_thread"):
return await asyncio.to_thread(func, *args, **kwargs)
loop = asyncio.get_running_loop()
def _run() -> Any:
return func(*args, **kwargs)
return await loop.run_in_executor(None, _run)
class AsyncBytesWriter:
def __init__(self, inner: Any) -> None:
self._inner = inner
async def write(self, data: bytes) -> None:
await _to_thread(self._inner.write, data)
async def wait_for_matched_subscription(
self, count: int, timeout_secs: float,
) -> None:
await _to_thread(
self._inner.wait_for_matched_subscription, count, timeout_secs,
)
def matched_subscription_count(self) -> int:
return self._inner.matched_subscription_count()
def publication_matched_status(self) -> tuple:
return self._inner.publication_matched_status()
def liveliness_lost_status(self) -> tuple:
return self._inner.liveliness_lost_status()
def offered_deadline_missed_status(self) -> tuple:
return self._inner.offered_deadline_missed_status()
class AsyncBytesReader:
def __init__(self, inner: Any) -> None:
self._inner = inner
async def wait_for_data(self, timeout_secs: float) -> None:
await _to_thread(self._inner.wait_for_data, timeout_secs)
async def wait_for_matched_publication(
self, count: int, timeout_secs: float,
) -> None:
await _to_thread(
self._inner.wait_for_matched_publication, count, timeout_secs,
)
def take(self) -> list[bytes]:
return self._inner.take()
def matched_publication_count(self) -> int:
return self._inner.matched_publication_count()
def subscription_matched_status(self) -> tuple:
return self._inner.subscription_matched_status()
def sample_lost_status(self) -> tuple:
return self._inner.sample_lost_status()
def requested_deadline_missed_status(self) -> tuple:
return self._inner.requested_deadline_missed_status()
class AsyncShapeWriter:
def __init__(self, inner: Any) -> None:
self._inner = inner
async def write(self, shape: Any) -> None:
await _to_thread(self._inner.write, shape)
async def wait_for_matched_subscription(
self, count: int, timeout_secs: float,
) -> None:
await _to_thread(
self._inner.wait_for_matched_subscription, count, timeout_secs,
)
def register_instance(self, shape: Any) -> int:
return self._inner.register_instance(shape)
def dispose(self, shape: Any) -> None:
self._inner.dispose(shape)
def unregister_instance(self, shape: Any) -> None:
self._inner.unregister_instance(shape)
class AsyncShapeReader:
def __init__(self, inner: Any) -> None:
self._inner = inner
async def wait_for_data(self, timeout_secs: float) -> None:
await _to_thread(self._inner.wait_for_data, timeout_secs)
async def wait_for_matched_publication(
self, count: int, timeout_secs: float,
) -> None:
await _to_thread(
self._inner.wait_for_matched_publication, count, timeout_secs,
)
def take(self) -> list[Any]:
return self._inner.take()
class AsyncWaitSet:
def __init__(self, inner: Any) -> None:
self._inner = inner
def attach_guard_condition(self, gc: Any) -> None:
self._inner.attach_guard_condition(gc)
async def wait(self, timeout_secs: float) -> int:
return await _to_thread(self._inner.wait, timeout_secs)
__all__ = [
"AsyncBytesReader",
"AsyncBytesWriter",
"AsyncShapeReader",
"AsyncShapeWriter",
"AsyncWaitSet",
]