from __future__ import annotations
import json
from typing import Any, Iterator, Literal, Optional, Union
try:
from net import ( DeckAdminCommands as _RawAdmin,
DeckClient as _RawClient,
DeckSdkError,
DeckSnapshotStream as _RawSnapshotStream,
DeckStatusSummaryStream as _RawStatusStream,
OperatorIdentity,
deck_sdk_error_kind,
)
try:
from net import ( AuditQuery as _RawAuditQuery,
AuditStream as _RawAuditStream,
FailureStream as _RawFailureStream,
LogStream as _RawLogStream,
)
_HAS_SLICE_2 = True
except ImportError: _HAS_SLICE_2 = False
try:
from net import ( DeckAdminVerifier as AdminVerifier,
DeckOperatorRegistry as OperatorRegistry,
)
_HAS_VERIFIER = True
except ImportError: _HAS_VERIFIER = False
except ImportError as e: raise ImportError(
"Deck SDK symbols not present in `net._net`. Rebuild the "
"wheel with `--features deck`, e.g. `maturin develop "
"--features deck`."
) from e
__all__ = [
"DeckClient",
"AdminCommands",
"SnapshotStream",
"StatusSummaryStream",
"DeckSdkError",
"OperatorIdentity",
"deck_sdk_error_kind",
"ChainCommit",
"StatusSummary",
"AuditQuery",
"LogStream",
"FailureStream",
"LogRecord",
"FailureRecord",
"AdminAuditRecord",
"LogLevel",
"LogFilter",
"IceCommands",
"IceProposal",
"SimulatedIceProposal",
"OperatorSignature",
"BlastRadius",
"AvoidScope",
]
if _HAS_VERIFIER:
__all__ += ["OperatorRegistry", "AdminVerifier"]
from typing import TypedDict
class ChainCommit(TypedDict):
commit_id: int
operator_id: int
event_kind: str
committed_at_ms: int
class PeerCounts(TypedDict):
healthy: int
degraded: int
unreachable: int
unknown: int
class DaemonCounts(TypedDict):
running: int
starting: int
stopping: int
stopped: int
backing_off: int
crash_looping: int
class StatusSummary(TypedDict):
peers: PeerCounts
daemons: DaemonCounts
replica_chains: int
avoid_list_entries: int
recently_emitted_count: int
recent_failure_count: int
admin_audit_ring_depth: int
freeze_remaining_ms: Optional[int]
local_maintenance_active: bool
LogLevel = Literal["trace", "debug", "info", "warn", "error"]
class LogFilter(TypedDict, total=False):
min_level: LogLevel
daemon_id: int
node_id: int
since_seq: int
class LogRecord(TypedDict):
seq: int
ts_ms: int
level: LogLevel
daemon_id: Optional[int]
node_id: Optional[int]
message: str
class FailureRecord(TypedDict):
seq: int
source: str
reason: str
recorded_at_ms: int
AdminAuditRecord = dict[str, Any]
class AdminCommands:
__slots__ = ("_raw",)
def __init__(self, raw: _RawAdmin) -> None:
self._raw = raw
def drain(self, node: int, drain_for_ms: int) -> ChainCommit:
return self._raw.drain(node, drain_for_ms)
def enter_maintenance(
self, node: int, drain_for_ms: Optional[int] = None
) -> ChainCommit:
return self._raw.enter_maintenance(node, drain_for_ms=drain_for_ms)
def exit_maintenance(self, node: int) -> ChainCommit:
return self._raw.exit_maintenance(node)
def cordon(self, node: int) -> ChainCommit:
return self._raw.cordon(node)
def uncordon(self, node: int) -> ChainCommit:
return self._raw.uncordon(node)
def drop_replicas(self, node: int, chains: list[int]) -> ChainCommit:
return self._raw.drop_replicas(node, chains)
def invalidate_placement(self, node: int) -> ChainCommit:
return self._raw.invalidate_placement(node)
def restart_all_daemons(self, node: int) -> ChainCommit:
return self._raw.restart_all_daemons(node)
def clear_avoid_list(self, node: int) -> ChainCommit:
return self._raw.clear_avoid_list(node)
class SnapshotStream:
__slots__ = ("_raw",)
def __init__(self, raw: _RawSnapshotStream) -> None:
self._raw = raw
def __iter__(self) -> Iterator[dict[str, Any]]:
return self
def __next__(self) -> dict[str, Any]:
return json.loads(next(self._raw))
def close(self) -> None:
self._raw.close()
class StatusSummaryStream:
__slots__ = ("_raw",)
def __init__(self, raw: _RawStatusStream) -> None:
self._raw = raw
def __iter__(self) -> Iterator[StatusSummary]:
return self
def __next__(self) -> StatusSummary:
return next(self._raw)
def close(self) -> None:
self._raw.close()
class LogStream:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
def __iter__(self) -> Iterator[LogRecord]:
return self
def __next__(self) -> LogRecord:
return next(self._raw)
def close(self) -> None:
self._raw.close()
class FailureStream:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
def __iter__(self) -> Iterator[FailureRecord]:
return self
def __next__(self) -> FailureRecord:
return next(self._raw)
def close(self) -> None:
self._raw.close()
class _AuditStreamWrapper:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
def __iter__(self) -> Iterator[AdminAuditRecord]:
return self
def __next__(self) -> AdminAuditRecord:
return json.loads(next(self._raw))
def close(self) -> None:
self._raw.close()
class AuditQuery:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
def recent(self, limit: int) -> "AuditQuery":
self._raw.recent(limit)
return self
def by_operator(self, operator_id: int) -> "AuditQuery":
self._raw.by_operator(operator_id)
return self
def between(self, start_ms: int, end_ms: int) -> "AuditQuery":
self._raw.between(start_ms, end_ms)
return self
def force_only(self) -> "AuditQuery":
self._raw.force_only()
return self
def since(self, seq: int) -> "AuditQuery":
self._raw.since(seq)
return self
def collect(self) -> list[AdminAuditRecord]:
return [json.loads(s) for s in self._raw.collect()]
def stream(self) -> _AuditStreamWrapper:
return _AuditStreamWrapper(self._raw.stream())
class DeckClient:
__slots__ = ("_raw",)
def __init__(
self,
meshos_sdk: Any,
identity: OperatorIdentity,
config: Optional[dict[str, Any]] = None,
) -> None:
raw_sdk = getattr(meshos_sdk, "_raw", meshos_sdk)
self._raw = _RawClient.from_meshos(raw_sdk, identity, config)
@classmethod
def from_seed(
cls,
operator_seed: bytes,
meshos_config: Optional[dict[str, Any]] = None,
deck_config: Optional[dict[str, Any]] = None,
) -> "DeckClient":
inst = cls.__new__(cls)
inst._raw = _RawClient(operator_seed, meshos_config, deck_config)
return inst
def close(self) -> None:
self._raw.close()
def __enter__(self) -> "DeckClient":
return self
def __exit__(self, exc_type, exc_value, traceback) -> Literal[False]:
self.close()
return False
def identity(self) -> OperatorIdentity:
return self._raw.identity()
@property
def admin(self) -> AdminCommands:
return AdminCommands(self._raw.admin)
def status(self) -> dict[str, Any]:
return json.loads(self._raw.status())
def status_summary(self) -> StatusSummary:
return self._raw.status_summary()
def snapshots(self) -> SnapshotStream:
return SnapshotStream(self._raw.snapshots())
def status_summary_stream(self) -> StatusSummaryStream:
return StatusSummaryStream(self._raw.status_summary_stream())
def audit(self) -> AuditQuery:
return AuditQuery(self._raw.audit())
def subscribe_logs(
self, filter: Optional[LogFilter] = None
) -> LogStream:
raw_filter = dict(filter) if filter is not None else None
return LogStream(self._raw.subscribe_logs(raw_filter))
def subscribe_failures(self, since_seq: int = 0) -> FailureStream:
return FailureStream(self._raw.subscribe_failures(since_seq))
@property
def ice(self) -> "IceCommands":
return IceCommands(self._raw.ice)
def __repr__(self) -> str:
return repr(self._raw)
AvoidScopeGlobal = TypedDict("AvoidScopeGlobal", {"kind": Literal["global"]})
AvoidScopeLocal = TypedDict("AvoidScopeLocal", {"kind": Literal["local"], "node": int})
AvoidScopeOnPeer = TypedDict("AvoidScopeOnPeer", {"kind": Literal["on_peer"], "peer": int})
AvoidScope = Union[AvoidScopeGlobal, AvoidScopeLocal, AvoidScopeOnPeer]
class OperatorSignature(TypedDict):
operator_id: int
signature: bytes
BlastRadius = dict[str, Any]
class IceCommands:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
def freeze_cluster(self, ttl_ms: int) -> "IceProposal":
return IceProposal(self._raw.freeze_cluster(ttl_ms))
def flush_avoid_lists(self, scope: AvoidScope) -> "IceProposal":
return IceProposal(self._raw.flush_avoid_lists(dict(scope)))
def force_evict_replica(self, chain: int, victim: int) -> "IceProposal":
return IceProposal(self._raw.force_evict_replica(chain, victim))
def force_restart_daemon(self, id: int, name: str) -> "IceProposal":
return IceProposal(self._raw.force_restart_daemon(id, name))
def force_cutover(self, chain: int, target: int) -> "IceProposal":
return IceProposal(self._raw.force_cutover(chain, target))
def kill_migration(self, migration: int) -> "IceProposal":
return IceProposal(self._raw.kill_migration(migration))
def thaw_cluster(self) -> "IceProposal":
return IceProposal(self._raw.thaw_cluster())
class IceProposal:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
@property
def issued_at_ms(self) -> int:
return self._raw.issued_at_ms
def simulate(self) -> "SimulatedIceProposal":
return SimulatedIceProposal(self._raw.simulate())
class SimulatedIceProposal:
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
@property
def issued_at_ms(self) -> int:
return self._raw.issued_at_ms
def blast_radius(self) -> BlastRadius:
return json.loads(self._raw.blast_radius())
def blast_hash(self) -> bytes:
return self._raw.blast_hash()
def commit(self, signatures: list[OperatorSignature]) -> ChainCommit:
return self._raw.commit([dict(s) for s in signatures])