import sys
import time
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parent
PROJECT_ROOT = MODULE_PATH.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from python import liner
from _support import ( ensure_release_lib,
free_port,
get_connection_key,
log,
register_peer_catalog,
pending_messages_count,
postgres_session,
wait_until,
)
def main() -> int:
liner.loadLib(str(ensure_release_lib()))
with postgres_session() as url:
sender_name, sender_topic = "sender_simple", "topic_sender_simple"
listener_name, listener_topic = "listener_simple", "topic_listener_simple"
sender_addr = f"127.0.0.1:{free_port()}"
listener_addr = f"127.0.0.1:{free_port()}"
register_peer_catalog(url, [(listener_topic, listener_addr, listener_name)])
s = liner.Client.new_postgres(sender_name, sender_topic, sender_addr, url)
s.clear_stored_messages()
s.clear_addresses_of_topic()
assert s.run(lambda _to, _from, _data: None), "sender failed to run"
s.refresh_address_topic(listener_topic)
payload = b"offline_simple"
log(f"[sender] send_to {listener_topic} while offline payload={payload!r}")
assert s.send_to(listener_topic, payload, True), "send_to failed"
got = {"data": None}
def on_recv(_to: str, _from: str, data: bytes):
got["data"] = data
log(f"[listener] recv from={_from} data={data!r}")
l = liner.Client.new_postgres(listener_name, listener_topic, listener_addr, url)
assert l.run(on_recv), "listener failed to run"
wait_until(lambda: got["data"] is not None, timeout_s=25.0, what="listener receive after offline")
assert got["data"] == payload, f"unexpected payload: {got['data']!r}"
l.close()
s.close()
time.sleep(0.25)
ck = get_connection_key(url, sender_name, sender_topic, listener_name)
if ck is not None:
wait_until(
lambda: pending_messages_count(url, ck) == 0,
timeout_s=25.0,
what="postgres queue drain",
)
log("OK offline_delivery_simple (postgres)")
return 0
if __name__ == "__main__":
raise SystemExit(main())