from __future__ import annotations
import datetime
import json
import shutil
import socket
import sqlite3
import subprocess
import tempfile
import time
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parent
PROJECT_ROOT = MODULE_PATH.parent.parent
def peer_catalog_json(peers: list[tuple[str, str, str]]) -> str:
return json.dumps(
[{"topic": t, "addr": a, "client_name": n} for (t, a, n) in peers],
separators=(",", ":"),
)
def log(msg: str) -> None:
now = datetime.datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S") + f".{int(now.microsecond / 1000):03d}"
print(f"[{ts}] {msg}", flush=True)
def wait_until(pred, timeout_s: float, sleep_s: float = 0.05, what: str = "condition") -> None:
deadline = time.time() + timeout_s
while time.time() < deadline:
if pred():
return
time.sleep(sleep_s)
raise TimeoutError(f"timeout waiting for {what}")
def free_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
s.close()
return int(port)
def ensure_release_lib() -> Path:
lib_path = PROJECT_ROOT / "target" / "release" / "libliner_broker.so"
if lib_path.exists():
return lib_path
subprocess.run(["cargo", "build", "--release"], cwd=str(PROJECT_ROOT), check=True)
if not lib_path.exists():
raise RuntimeError(f"release library not found at {lib_path}")
return lib_path
def composite(sender_name: str, sender_topic: str, listener_name: str) -> str:
return f"{sender_name}:{sender_topic}:{listener_name}"
def _connect_reader(db_path: str) -> sqlite3.Connection:
return sqlite3.connect(db_path, timeout=30.0, isolation_level=None)
def get_connection_key(db_path: str, sender_name: str, sender_topic: str, listener_name: str) -> int | None:
comp = composite(sender_name, sender_topic, listener_name)
con = _connect_reader(db_path)
try:
cur = con.execute(
"SELECT connection_key FROM conn_key_map WHERE composite = ?",
(comp,),
)
row = cur.fetchone()
return int(row[0]) if row else None
finally:
con.close()
def pending_messages_count(db_path: str, connection_key: int) -> int:
con = _connect_reader(db_path)
try:
cur = con.execute(
"SELECT COUNT(*) FROM conn_messages WHERE connection_key = ?",
(connection_key,),
)
row = cur.fetchone()
return int(row[0]) if row else 0
finally:
con.close()
def temp_shared_db(prefix: str = "liner_sqlite_it_") -> tuple[str, callable]:
d = tempfile.mkdtemp(prefix=prefix)
db_path = str(Path(d) / "shared.sqlite")
def cleanup() -> None:
shutil.rmtree(d, ignore_errors=True)
return db_path, cleanup