from __future__ import annotations
from dataclasses import dataclass
from typing import List, Literal, Optional
from net import ( NetMesh as _NetMesh,
BackpressureError,
NotConnectedError,
)
Reliability = Literal["fire_and_forget", "reliable"]
@dataclass(frozen=True)
class StreamStats:
tx_seq: int
rx_seq: int
inbound_pending: int
last_activity_ns: int
active: bool
backpressure_events: int
tx_credit_remaining: int
tx_window: int
credit_grants_received: int
credit_grants_sent: int
class MeshStream:
__slots__ = ("peer_node_id", "stream_id", "_native")
def __init__(self, peer_node_id: int, stream_id: int, native: object) -> None:
self.peer_node_id = peer_node_id
self.stream_id = stream_id
self._native = native
def __repr__(self) -> str:
return (
f"MeshStream(peer_node_id={self.peer_node_id:#x}, "
f"stream_id={self.stream_id:#x})"
)
class MeshNode:
def __init__(
self,
bind_addr: str,
psk: str,
*,
heartbeat_interval_ms: Optional[int] = None,
session_timeout_ms: Optional[int] = None,
num_shards: Optional[int] = None,
) -> None:
self._native = _NetMesh(
bind_addr,
psk,
heartbeat_interval_ms=heartbeat_interval_ms,
session_timeout_ms=session_timeout_ms,
num_shards=num_shards,
)
@property
def public_key(self) -> str:
return self._native.public_key
@property
def node_id(self) -> int:
return self._native.node_id
def connect(self, peer_addr: str, peer_public_key: str, peer_node_id: int) -> None:
self._native.connect(peer_addr, peer_public_key, peer_node_id)
def accept(self, peer_node_id: int) -> str:
return self._native.accept(peer_node_id)
def start(self) -> None:
self._native.start()
def peer_count(self) -> int:
return self._native.peer_count()
def open_stream(
self,
peer_node_id: int,
stream_id: int,
*,
reliability: Reliability = "fire_and_forget",
window_bytes: Optional[int] = None,
fairness_weight: int = 1,
) -> MeshStream:
kwargs = {
"reliability": reliability,
"fairness_weight": fairness_weight,
}
if window_bytes is not None:
kwargs["window_bytes"] = window_bytes
native = self._native.open_stream(peer_node_id, stream_id, **kwargs)
return MeshStream(peer_node_id, stream_id, native)
def close_stream(self, peer_node_id: int, stream_id: int) -> None:
self._native.close_stream(peer_node_id, stream_id)
def send_on_stream(self, stream: MeshStream, events: List[bytes]) -> None:
self._native.send_on_stream(stream._native, events)
def send_with_retry(
self,
stream: MeshStream,
events: List[bytes],
max_retries: int = 8,
) -> None:
self._native.send_with_retry(stream._native, events, max_retries)
def send_blocking(self, stream: MeshStream, events: List[bytes]) -> None:
self._native.send_blocking(stream._native, events)
def stream_stats(self, peer_node_id: int, stream_id: int) -> Optional[StreamStats]:
raw = self._native.stream_stats(peer_node_id, stream_id)
if raw is None:
return None
return StreamStats(
tx_seq=raw.tx_seq,
rx_seq=raw.rx_seq,
inbound_pending=raw.inbound_pending,
last_activity_ns=raw.last_activity_ns,
active=raw.active,
backpressure_events=raw.backpressure_events,
tx_credit_remaining=raw.tx_credit_remaining,
tx_window=raw.tx_window,
credit_grants_received=raw.credit_grants_received,
credit_grants_sent=raw.credit_grants_sent,
)
def shutdown(self) -> None:
self._native.shutdown()
def __enter__(self) -> "MeshNode":
return self
def __exit__(self, *_: object) -> None:
self.shutdown()
__all__ = [
"MeshNode",
"MeshStream",
"StreamStats",
"Reliability",
"BackpressureError",
"NotConnectedError",
]