from __future__ import annotations
import ctypes
import os
import sys
from pathlib import Path
from typing import Optional
__all__ = [
"Runtime",
"Writer",
"Reader",
"DomainParticipantFactory",
"ZeroDdsError",
"load_library",
]
def _platform_libname() -> str:
if sys.platform == "darwin":
return "libzerodds.dylib"
if sys.platform == "win32":
return "zerodds.dll"
return "libzerodds.so"
def load_library() -> ctypes.CDLL:
name = _platform_libname()
env = os.environ.get("ZERODDS_LIB")
if env:
return ctypes.CDLL(env)
here = Path(__file__).resolve().parent
bundled = here / "_lib" / name
if bundled.exists():
return ctypes.CDLL(str(bundled))
candidate_roots = []
cursor = here
for _ in range(8):
cursor = cursor.parent
candidate_roots.append(cursor)
for root in candidate_roots:
for sub in ("target/debug", "target/release"):
cand = root / sub / name
if cand.exists():
return ctypes.CDLL(str(cand))
return ctypes.CDLL(name)
def _bind(lib: ctypes.CDLL) -> ctypes.CDLL:
p_rt = ctypes.c_void_p
p_w = ctypes.c_void_p
p_r = ctypes.c_void_p
lib.zerodds_runtime_create.argtypes = [ctypes.c_uint32]
lib.zerodds_runtime_create.restype = p_rt
lib.zerodds_runtime_destroy.argtypes = [p_rt]
lib.zerodds_runtime_destroy.restype = None
lib.zerodds_runtime_wait_for_peers.argtypes = [
p_rt,
ctypes.c_int,
ctypes.c_uint64,
]
lib.zerodds_runtime_wait_for_peers.restype = ctypes.c_int
lib.zerodds_writer_create.argtypes = [
p_rt,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_int,
]
lib.zerodds_writer_create.restype = p_w
lib.zerodds_writer_write.argtypes = [
p_w,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
lib.zerodds_writer_write.restype = ctypes.c_int
lib.zerodds_writer_wait_for_matched.argtypes = [
p_w,
ctypes.c_int,
ctypes.c_uint64,
]
lib.zerodds_writer_wait_for_matched.restype = ctypes.c_int
lib.zerodds_writer_destroy.argtypes = [p_w]
lib.zerodds_writer_destroy.restype = None
lib.zerodds_reader_create.argtypes = [
p_rt,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_int,
]
lib.zerodds_reader_create.restype = p_r
lib.zerodds_reader_take.argtypes = [
p_r,
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint8)),
ctypes.POINTER(ctypes.c_size_t),
]
lib.zerodds_reader_take.restype = ctypes.c_int
lib.zerodds_reader_wait_for_matched.argtypes = [
p_r,
ctypes.c_int,
ctypes.c_uint64,
]
lib.zerodds_reader_wait_for_matched.restype = ctypes.c_int
lib.zerodds_reader_destroy.argtypes = [p_r]
lib.zerodds_reader_destroy.restype = None
lib.zerodds_buffer_free.argtypes = [
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
lib.zerodds_buffer_free.restype = None
lib.zerodds_version.argtypes = []
lib.zerodds_version.restype = ctypes.c_char_p
return lib
_lib: Optional[ctypes.CDLL] = None
def _get_lib() -> ctypes.CDLL:
global _lib
if _lib is None:
_lib = _bind(load_library())
return _lib
class ZeroDdsError(RuntimeError):
class Runtime:
def __init__(self, domain_id: int = 0) -> None:
lib = _get_lib()
ptr = lib.zerodds_runtime_create(ctypes.c_uint32(domain_id))
if not ptr:
raise ZeroDdsError(
f"zerodds_runtime_create returned NULL for domain={domain_id}"
)
self._lib = lib
self._ptr = ctypes.c_void_p(ptr)
self._domain_id = domain_id
@property
def domain_id(self) -> int:
return self._domain_id
@property
def raw(self) -> ctypes.c_void_p:
return self._ptr
def wait_for_peers(self, min_count: int, timeout_ms: int) -> int:
rc = self._lib.zerodds_runtime_wait_for_peers(
self._ptr, ctypes.c_int(min_count), ctypes.c_uint64(timeout_ms)
)
return int(rc)
def close(self) -> None:
if self._ptr:
self._lib.zerodds_runtime_destroy(self._ptr)
self._ptr = ctypes.c_void_p()
def __enter__(self) -> "Runtime":
return self
def __exit__(self, *exc) -> None:
self.close()
def __del__(self) -> None:
try:
self.close()
except Exception: pass
class Writer:
def __init__(
self,
runtime: Runtime,
topic: str,
type_name: Optional[str] = None,
reliable: bool = True,
) -> None:
if type_name is None:
type_name = topic
lib = runtime._lib
ptr = lib.zerodds_writer_create(
runtime._ptr,
topic.encode("utf-8"),
type_name.encode("utf-8"),
ctypes.c_int(1 if reliable else 0),
)
if not ptr:
raise ZeroDdsError(
f"zerodds_writer_create failed for topic={topic!r}"
)
self._lib = lib
self._ptr = ctypes.c_void_p(ptr)
self.topic = topic
def write(self, payload: bytes) -> None:
buf_t = ctypes.c_uint8 * len(payload)
buf = buf_t.from_buffer_copy(payload)
rc = self._lib.zerodds_writer_write(
self._ptr,
ctypes.cast(buf, ctypes.POINTER(ctypes.c_uint8)),
ctypes.c_size_t(len(payload)),
)
if rc != 0:
raise ZeroDdsError(f"zerodds_writer_write rc={rc}")
def wait_for_matched(self, min_count: int = 1, timeout_ms: int = 5000) -> None:
rc = self._lib.zerodds_writer_wait_for_matched(
self._ptr, ctypes.c_int(min_count), ctypes.c_uint64(timeout_ms)
)
if rc != 0:
raise ZeroDdsError(
f"writer wait_for_matched(min={min_count}, "
f"timeout_ms={timeout_ms}) rc={rc}"
)
def close(self) -> None:
if self._ptr:
self._lib.zerodds_writer_destroy(self._ptr)
self._ptr = ctypes.c_void_p()
def __enter__(self) -> "Writer":
return self
def __exit__(self, *exc) -> None:
self.close()
def __del__(self) -> None:
try:
self.close()
except Exception: pass
class Reader:
def __init__(
self,
runtime: Runtime,
topic: str,
type_name: Optional[str] = None,
reliable: bool = True,
) -> None:
if type_name is None:
type_name = topic
lib = runtime._lib
ptr = lib.zerodds_reader_create(
runtime._ptr,
topic.encode("utf-8"),
type_name.encode("utf-8"),
ctypes.c_int(1 if reliable else 0),
)
if not ptr:
raise ZeroDdsError(
f"zerodds_reader_create failed for topic={topic!r}"
)
self._lib = lib
self._ptr = ctypes.c_void_p(ptr)
self.topic = topic
def take(self) -> Optional[bytes]:
out_buf = ctypes.POINTER(ctypes.c_uint8)()
out_len = ctypes.c_size_t(0)
rc = self._lib.zerodds_reader_take(
self._ptr, ctypes.byref(out_buf), ctypes.byref(out_len)
)
if rc != 0:
raise ZeroDdsError(f"zerodds_reader_take rc={rc}")
if not out_buf or out_len.value == 0:
return None
try:
return bytes(
ctypes.cast(out_buf, ctypes.POINTER(ctypes.c_uint8 * out_len.value))[0]
)
finally:
self._lib.zerodds_buffer_free(out_buf, out_len)
def take_all(self, max_samples: int = 16) -> list[bytes]:
out: list[bytes] = []
for _ in range(max_samples):
sample = self.take()
if sample is None:
break
out.append(sample)
return out
def wait_for_matched(self, min_count: int = 1, timeout_ms: int = 5000) -> None:
rc = self._lib.zerodds_reader_wait_for_matched(
self._ptr, ctypes.c_int(min_count), ctypes.c_uint64(timeout_ms)
)
if rc != 0:
raise ZeroDdsError(
f"reader wait_for_matched(min={min_count}, "
f"timeout_ms={timeout_ms}) rc={rc}"
)
def close(self) -> None:
if self._ptr:
self._lib.zerodds_reader_destroy(self._ptr)
self._ptr = ctypes.c_void_p()
def __enter__(self) -> "Reader":
return self
def __exit__(self, *exc) -> None:
self.close()
def __del__(self) -> None:
try:
self.close()
except Exception: pass
class DomainParticipantFactory:
@classmethod
def instance(cls) -> "DomainParticipantFactory":
return cls()
def create_participant(self, domain_id: int = 0) -> Runtime:
return Runtime(domain_id=domain_id)