liner_broker 1.3.1

Redis based message serverless broker.
Documentation
#!/usr/bin/python3
# -*- coding: utf-8 -*-

import sys
import time
from pathlib import Path

MODULE_PATH = Path(__file__).resolve().parent
PROJECT_ROOT = MODULE_PATH.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))

from python import liner  # noqa: E402

from _support import (  # noqa: E402
    ensure_release_lib,
    free_port,
    get_connection_key,
    log,
    register_peer_catalog,
    pending_messages_count,
    postgres_session,
    wait_until,
)


def main() -> int:
    liner.loadLib(str(ensure_release_lib()))
    with postgres_session() as url:
        sender_name, sender_topic = "sender_simple", "topic_sender_simple"
        listener_name, listener_topic = "listener_simple", "topic_listener_simple"

        sender_addr = f"127.0.0.1:{free_port()}"
        listener_addr = f"127.0.0.1:{free_port()}"

        register_peer_catalog(url, [(listener_topic, listener_addr, listener_name)])
        s = liner.Client.new_postgres(sender_name, sender_topic, sender_addr, url)
        s.clear_stored_messages()
        s.clear_addresses_of_topic()

        assert s.run(lambda _to, _from, _data: None), "sender failed to run"
        s.refresh_address_topic(listener_topic)

        payload = b"offline_simple"
        log(f"[sender] send_to {listener_topic} while offline payload={payload!r}")
        assert s.send_to(listener_topic, payload, True), "send_to failed"

        got = {"data": None}

        def on_recv(_to: str, _from: str, data: bytes):
            got["data"] = data
            log(f"[listener] recv from={_from} data={data!r}")

        l = liner.Client.new_postgres(listener_name, listener_topic, listener_addr, url)
        assert l.run(on_recv), "listener failed to run"

        wait_until(lambda: got["data"] is not None, timeout_s=25.0, what="listener receive after offline")
        assert got["data"] == payload, f"unexpected payload: {got['data']!r}"

        l.close()
        s.close()
        time.sleep(0.25)
        ck = get_connection_key(url, sender_name, sender_topic, listener_name)
        if ck is not None:
            wait_until(
                lambda: pending_messages_count(url, ck) == 0,
                timeout_s=25.0,
                what="postgres queue drain",
            )
        log("OK offline_delivery_simple (postgres)")

        return 0


if __name__ == "__main__":
    raise SystemExit(main())