import os
import subprocess
import time
import tempfile
import shutil
from typing import Generator, Optional
from pathlib import Path
import pytest
RUSTBERG_HOST = os.getenv("RUSTBERG_HOST", "localhost")
RUSTBERG_PORT = int(os.getenv("RUSTBERG_PORT", "8181"))
RUSTBERG_URL = f"http://{RUSTBERG_HOST}:{RUSTBERG_PORT}"
RUSTBERG_BINARY = os.getenv("RUSTBERG_BINARY", None)
RUSTBERG_STARTUP_TIMEOUT = 30
class RustbergServer:
def __init__(self, binary_path: str, data_dir: str, port: int):
self.binary_path = binary_path
self.data_dir = data_dir
self.port = port
self.process: Optional[subprocess.Popen] = None
def start(self) -> None:
if self.process is not None:
raise RuntimeError("Server already running")
env = os.environ.copy()
env["RUST_LOG"] = "warn"
self.process = subprocess.Popen(
[
self.binary_path,
"--insecure-http",
"--no-auth",
"--host", "127.0.0.1",
"--port", str(self.port),
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self._wait_for_ready()
def _wait_for_ready(self) -> None:
import urllib.request
import urllib.error
url = f"http://127.0.0.1:{self.port}/v1/config"
start = time.time()
while time.time() - start < RUSTBERG_STARTUP_TIMEOUT:
if self.process and self.process.poll() is not None:
exit_code = self.process.returncode
stderr_output = ""
stdout_output = ""
if self.process.stderr:
stderr_output = self.process.stderr.read().decode("utf-8", errors="replace")
if self.process.stdout:
stdout_output = self.process.stdout.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"Rustberg server exited unexpectedly with code {exit_code}\n"
f"Binary: {self.binary_path}\n"
f"Stdout: {stdout_output}\n"
f"Stderr: {stderr_output}"
)
try:
urllib.request.urlopen(url, timeout=1)
return
except (urllib.error.URLError, ConnectionRefusedError):
time.sleep(0.1)
stderr_output = ""
stdout_output = ""
if self.process:
if self.process.stderr:
import select
if hasattr(select, 'select'):
readable, _, _ = select.select([self.process.stderr], [], [], 0)
if readable:
stderr_output = self.process.stderr.read().decode("utf-8", errors="replace")
raise TimeoutError(
f"Rustberg server did not start within {RUSTBERG_STARTUP_TIMEOUT}s\n"
f"Binary: {self.binary_path}\n"
f"Stderr: {stderr_output}"
)
def stop(self) -> None:
if self.process is None:
return
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process.wait()
self.process = None
def find_rustberg_binary() -> Optional[str]:
if RUSTBERG_BINARY:
return RUSTBERG_BINARY
workspace_root = Path(__file__).parent.parent.parent
candidates = [
workspace_root / "target" / "release" / "rustberg",
workspace_root / "target" / "debug" / "rustberg",
]
for candidate in candidates:
if candidate.exists() and candidate.is_file():
return str(candidate)
return None
@pytest.fixture(scope="session")
def rustberg_binary() -> str:
binary = find_rustberg_binary()
if binary is None:
pytest.skip("Rustberg binary not found. Build with: cargo build --all-features")
return binary
@pytest.fixture(scope="session")
def rustberg_server(rustberg_binary: str) -> Generator[str, None, None]:
data_dir = tempfile.mkdtemp(prefix="rustberg_test_")
try:
server = RustbergServer(rustberg_binary, data_dir, RUSTBERG_PORT)
server.start()
yield f"http://127.0.0.1:{RUSTBERG_PORT}"
server.stop()
finally:
shutil.rmtree(data_dir, ignore_errors=True)
@pytest.fixture(scope="session")
def warehouse_path() -> Generator[str, None, None]:
warehouse = tempfile.mkdtemp(prefix="rustberg_warehouse_")
yield warehouse
shutil.rmtree(warehouse, ignore_errors=True)
@pytest.fixture(scope="session")
def pyiceberg_catalog(rustberg_server: str):
from pyiceberg.catalog.rest import RestCatalog
return RestCatalog(
name="rustberg_test",
uri=rustberg_server,
)
@pytest.fixture
def temp_namespace(pyiceberg_catalog) -> Generator[str, None, None]:
import uuid
namespace = f"test_ns_{uuid.uuid4().hex[:8]}"
pyiceberg_catalog.create_namespace(namespace)
yield namespace
try:
for table in pyiceberg_catalog.list_tables(namespace):
pyiceberg_catalog.drop_table(table)
pyiceberg_catalog.drop_namespace(namespace)
except Exception:
pass
@pytest.fixture
def sample_schema():
from pyiceberg.schema import Schema
from pyiceberg.types import (
IntegerType,
LongType,
StringType,
TimestampType,
DoubleType,
BooleanType,
NestedField,
)
return Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="name", field_type=StringType(), required=True),
NestedField(field_id=3, name="created_at", field_type=TimestampType(), required=True),
NestedField(field_id=4, name="value", field_type=DoubleType(), required=False),
NestedField(field_id=5, name="is_active", field_type=BooleanType(), required=False),
NestedField(field_id=6, name="count", field_type=IntegerType(), required=False),
)
@pytest.fixture
def sample_partition_spec():
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
return PartitionSpec(
PartitionField(source_id=3, field_id=1000, transform=DayTransform(), name="created_day"),
)
@pytest.fixture
def sample_sort_order():
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
return SortOrder(
SortField(source_id=1, transform=IdentityTransform()),
)