from __future__ import annotations
import json
from dataclasses import replace
from typing import Any, Callable, Generic, Iterator, Optional, TypeVar
from net import Net
from net_sdk.stream import EventStream, SubscribeOpts, TypedEventStream
T = TypeVar("T")
def _to_dict(event: Any) -> dict:
if hasattr(event, "model_dump"):
return event.model_dump()
elif isinstance(event, dict):
return dict(event)
elif hasattr(event, "__dict__"):
return dict(event.__dict__)
else:
return {"_value": event}
class TypedChannel(Generic[T]):
def __init__(
self,
bus: Net,
name: str,
model: Optional[type] = None,
parse: Optional[Callable[[str], T]] = None,
) -> None:
self._bus = bus
self._name = name
self._model = model
self._parse = parse
self._filter = json.dumps({"path": "_channel", "value": name})
@property
def name(self) -> str:
return self._name
def publish(self, event: T) -> None:
data = _to_dict(event)
data["_channel"] = self._name
self._bus.ingest_raw(json.dumps(data))
def publish_batch(self, events: list[T]) -> int:
payloads = []
for event in events:
data = _to_dict(event)
data["_channel"] = self._name
payloads.append(json.dumps(data))
return self._bus.ingest_raw_batch(payloads)
def subscribe(self, opts: Optional[SubscribeOpts] = None) -> TypedEventStream[T]:
merged = SubscribeOpts() if opts is None else replace(opts)
if merged.filter is None:
merged.filter = self._filter
if self._parse is not None:
parse_fn = self._parse
elif self._model is not None:
model = self._model
def parse_fn(raw: str) -> T:
data = json.loads(raw)
data.pop("_channel", None)
return model(**data) else:
def parse_fn(raw: str) -> T:
data = json.loads(raw)
data.pop("_channel", None)
return data
return TypedEventStream(self._bus, parse_fn, merged)
def subscribe_raw(self, opts: Optional[SubscribeOpts] = None) -> EventStream:
merged = SubscribeOpts() if opts is None else replace(opts)
if merged.filter is None:
merged.filter = self._filter
return EventStream(self._bus, merged)