from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import AsyncIterator, Callable, Generic, Iterator, Optional, TypeVar
from net import Net, StoredEvent
T = TypeVar("T")
DEFAULT_LIMIT = 100
DEFAULT_POLL_INTERVAL = 0.005 DEFAULT_MAX_BACKOFF = 0.1
@dataclass
class SubscribeOpts:
limit: int = DEFAULT_LIMIT
filter: Optional[str] = None
ordering: Optional[str] = None
poll_interval: float = DEFAULT_POLL_INTERVAL
max_backoff: float = DEFAULT_MAX_BACKOFF
timeout: Optional[float] = None
class EventStream:
def __init__(self, bus: Net, opts: Optional[SubscribeOpts] = None) -> None:
self._bus = bus
self._opts = opts or SubscribeOpts()
self._cursor: Optional[str] = None
self._stopped = False
def stop(self) -> None:
self._stopped = True
def _poll(self):
return self._bus.poll(
limit=self._opts.limit,
cursor=self._cursor,
filter=self._opts.filter,
ordering=self._opts.ordering,
)
def __iter__(self) -> Iterator[StoredEvent]:
poll_interval = max(0.0, self._opts.poll_interval)
max_backoff = max(0.0, self._opts.max_backoff)
backoff = poll_interval
start = time.monotonic()
while not self._stopped:
if self._opts.timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= self._opts.timeout:
return
response = self._poll()
if len(response) > 0:
backoff = poll_interval
self._cursor = response.next_id
for event in response:
yield event
if len(response) < self._opts.limit:
time.sleep(poll_interval)
else:
time.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
async def __aiter__(self) -> AsyncIterator[StoredEvent]:
poll_interval = max(0.0, self._opts.poll_interval)
max_backoff = max(0.0, self._opts.max_backoff)
backoff = poll_interval
start = time.monotonic()
while not self._stopped:
if self._opts.timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= self._opts.timeout:
return
response = self._poll()
if len(response) > 0:
backoff = poll_interval
self._cursor = response.next_id
for event in response:
yield event
if len(response) < self._opts.limit:
await asyncio.sleep(poll_interval)
else:
await asyncio.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
class TypedEventStream(Generic[T]):
def __init__(
self,
bus: Net,
parse: Callable[[str], T],
opts: Optional[SubscribeOpts] = None,
) -> None:
self._inner = EventStream(bus, opts)
self._parse = parse
def stop(self) -> None:
self._inner.stop()
def __iter__(self) -> Iterator[T]:
for event in self._inner:
yield self._parse(event.raw)
async def __aiter__(self) -> AsyncIterator[T]:
async for event in self._inner:
yield self._parse(event.raw)