kitedb 0.2.18

High-performance embedded graph database
Documentation
"""End-to-end replication transport/admin flow validation for Python bindings."""

from __future__ import annotations

import importlib.util
import json
import os
from pathlib import Path
import sys
import tempfile

import pytest

PY_ROOT = Path(__file__).resolve().parents[1]
NATIVE_CANDIDATES = sorted((PY_ROOT / "kitedb").glob("_kitedb*.so"))
if not NATIVE_CANDIDATES:
    raise RuntimeError(f"missing native extension under {PY_ROOT / 'kitedb'}")

NATIVE_PATH = NATIVE_CANDIDATES[0]
NATIVE_SPEC = importlib.util.spec_from_file_location("_kitedb", NATIVE_PATH)
if NATIVE_SPEC is None or NATIVE_SPEC.loader is None:
    raise RuntimeError(f"failed loading native module from {NATIVE_PATH}")
NATIVE = importlib.util.module_from_spec(NATIVE_SPEC)
sys.modules[NATIVE_SPEC.name] = NATIVE
NATIVE_SPEC.loader.exec_module(NATIVE)

AUTH_PATH = PY_ROOT / "kitedb" / "replication_auth.py"
AUTH_SPEC = importlib.util.spec_from_file_location("kitedb_replication_auth", AUTH_PATH)
if AUTH_SPEC is None or AUTH_SPEC.loader is None:
    raise RuntimeError(f"failed loading replication auth module from {AUTH_PATH}")
AUTH = importlib.util.module_from_spec(AUTH_SPEC)
sys.modules[AUTH_SPEC.name] = AUTH
AUTH_SPEC.loader.exec_module(AUTH)

Database = NATIVE.Database
OpenOptions = NATIVE.OpenOptions
collect_replication_snapshot_transport_json = NATIVE.collect_replication_snapshot_transport_json
collect_replication_log_transport_json = NATIVE.collect_replication_log_transport_json
collect_replication_metrics_prometheus = NATIVE.collect_replication_metrics_prometheus

ReplicationAdminAuthConfig = AUTH.ReplicationAdminAuthConfig
create_replication_admin_authorizer = AUTH.create_replication_admin_authorizer


class FakeRequest:
    def __init__(self, headers: dict[str, str] | None = None):
        self.headers = headers or {}
        self.scope: dict[str, object] = {}


def _drain_replica(replica: object, max_frames: int, max_loops: int = 64) -> None:
    for _ in range(max_loops):
        applied = replica.replica_catch_up_once(max_frames)
        if applied == 0:
            return


def test_python_replication_transport_admin_flow_roundtrip():
    with tempfile.TemporaryDirectory() as tmpdir:
        primary_path = os.path.join(tmpdir, "primary.kitedb")
        primary_sidecar = os.path.join(tmpdir, "primary.sidecar")
        replica_path = os.path.join(tmpdir, "replica.kitedb")
        replica_sidecar = os.path.join(tmpdir, "replica.sidecar")

        primary = Database(
            primary_path,
            OpenOptions(
                replication_role="primary",
                replication_sidecar_path=primary_sidecar,
                replication_segment_max_bytes=1,
                replication_retention_min_entries=1,
                auto_checkpoint=False,
            ),
        )
        stale = Database(
            primary_path,
            OpenOptions(
                replication_role="primary",
                replication_sidecar_path=primary_sidecar,
                replication_segment_max_bytes=1,
                replication_retention_min_entries=1,
                auto_checkpoint=False,
            ),
        )
        replica = Database(
            replica_path,
            OpenOptions(
                replication_role="replica",
                replication_sidecar_path=replica_sidecar,
                replication_source_db_path=primary_path,
                replication_source_sidecar_path=primary_sidecar,
                auto_checkpoint=False,
            ),
        )

        try:
            primary.begin(False)
            primary.create_node("n:base")
            token_base = primary.commit_with_token()
            assert token_base.startswith("1:")

            replica.replica_bootstrap_from_snapshot()
            replica_status = replica.replica_replication_status()
            assert replica_status["needs_reseed"] is False
            assert replica_status["applied_log_index"] == 1

            snapshot = json.loads(
                collect_replication_snapshot_transport_json(primary, include_data=False)
            )
            snapshot_direct = json.loads(
                primary.export_replication_snapshot_transport_json(False)
            )
            assert snapshot["epoch"] == snapshot_direct["epoch"]
            assert snapshot["head_log_index"] == snapshot_direct["head_log_index"]

            log_page = json.loads(
                collect_replication_log_transport_json(
                    primary,
                    cursor=None,
                    max_frames=128,
                    max_bytes=1024 * 1024,
                    include_payload=False,
                )
            )
            log_page_direct = json.loads(
                primary.export_replication_log_transport_json(
                    None,
                    128,
                    1024 * 1024,
                    False,
                )
            )
            assert log_page["frame_count"] == log_page_direct["frame_count"]
            assert log_page["frame_count"] >= 1

            prometheus = collect_replication_metrics_prometheus(primary)
            assert "kitedb_replication_" in prometheus

            require_admin = create_replication_admin_authorizer(
                ReplicationAdminAuthConfig(mode="token", token="secret-token")
            )
            require_admin(FakeRequest({"authorization": "Bearer secret-token"}))
            with pytest.raises(PermissionError, match="not satisfied"):
                require_admin(FakeRequest({"authorization": "Bearer wrong-token"}))

            for i in range(6):
                primary.begin(False)
                primary.create_node(f"n:lag-{i}")
                primary.commit_with_token()

            lag_status = replica.replica_replication_status()
            primary.primary_report_replica_progress(
                "replica-a",
                lag_status["applied_epoch"],
                lag_status["applied_log_index"],
            )
            primary.primary_run_retention()

            with pytest.raises(Exception, match="reseed"):
                replica.replica_catch_up_once(64)
            assert replica.replica_replication_status()["needs_reseed"] is True

            primary.checkpoint()
            replica.replica_reseed_from_snapshot()
            assert replica.replica_replication_status()["needs_reseed"] is False
            assert replica.count_nodes() == primary.count_nodes()

            before = primary.primary_replication_status()["epoch"]
            promoted = primary.primary_promote_to_next_epoch()
            assert promoted > before

            stale.begin(False)
            stale.create_node("n:stale-write")
            with pytest.raises(Exception, match="stale primary"):
                stale.commit_with_token()
            if stale.has_transaction():
                stale.rollback()

            primary.begin(False)
            primary.create_node("n:post-promote")
            promoted_token = primary.commit_with_token()
            assert promoted_token.startswith(f"{promoted}:")

            assert not replica.wait_for_token(promoted_token, 5)
            _drain_replica(replica, 128)
            assert replica.wait_for_token(promoted_token, 2000)
            assert replica.count_nodes() == primary.count_nodes()
        finally:
            replica.close()
            stale.close()
            primary.close()