import pytest
import httpx
from pyiceberg.catalog.rest import RestCatalog
class TestCatalogConfig:
def test_config_endpoint(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/v1/config")
assert response.status_code == 200
data = response.json()
assert "defaults" in data or "overrides" in data or isinstance(data, dict)
def test_config_with_warehouse(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/v1/config", params={"warehouse": "test"})
assert response.status_code == 200
class TestHealthEndpoints:
def test_health_endpoint(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/health")
assert response.status_code == 200
def test_ready_endpoint(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/ready")
assert response.status_code == 200
class TestCatalogProperties:
def test_catalog_properties(self, catalog: RestCatalog):
props = catalog.properties
assert isinstance(props, dict)
def test_catalog_name(self, catalog: RestCatalog):
assert catalog.name is not None
assert len(catalog.name) > 0
class TestErrorHandling:
def test_not_found_table(self, rustberg_server: str):
response = httpx.get(
f"{rustberg_server}/v1/namespaces/nonexistent/tables/nonexistent"
)
assert response.status_code == 404
def test_not_found_namespace(self, rustberg_server: str):
response = httpx.get(
f"{rustberg_server}/v1/namespaces/nonexistent_namespace_12345"
)
assert response.status_code == 404
def test_method_not_allowed(self, rustberg_server: str):
response = httpx.patch(f"{rustberg_server}/v1/config")
assert response.status_code in [404, 405]
def test_invalid_json_body(self, rustberg_server: str):
response = httpx.post(
f"{rustberg_server}/v1/namespaces",
content="not valid json",
headers={"Content-Type": "application/json"},
)
assert response.status_code in [400, 422]
class TestIcebergRESTSpec:
def test_namespace_list_response_format(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/v1/namespaces")
assert response.status_code == 200
data = response.json()
assert "namespaces" in data
assert isinstance(data["namespaces"], list)
def test_namespace_create_response_format(self, rustberg_server: str):
import uuid
ns_name = f"spec_test_{uuid.uuid4().hex[:8]}"
response = httpx.post(
f"{rustberg_server}/v1/namespaces",
json={"namespace": [ns_name], "properties": {}},
)
assert response.status_code == 200
data = response.json()
assert "namespace" in data
assert "properties" in data
httpx.delete(f"{rustberg_server}/v1/namespaces/{ns_name}")
def test_table_list_response_format(self, rustberg_server: str):
import uuid
ns_name = f"table_list_test_{uuid.uuid4().hex[:8]}"
httpx.post(
f"{rustberg_server}/v1/namespaces",
json={"namespace": [ns_name], "properties": {}},
)
try:
response = httpx.get(f"{rustberg_server}/v1/namespaces/{ns_name}/tables")
assert response.status_code == 200
data = response.json()
assert "identifiers" in data
assert isinstance(data["identifiers"], list)
finally:
httpx.delete(f"{rustberg_server}/v1/namespaces/{ns_name}")
def test_error_response_format(self, rustberg_server: str):
response = httpx.get(
f"{rustberg_server}/v1/namespaces/nonexistent_namespace_12345"
)
assert response.status_code == 404
data = response.json()
assert "error" in data
assert "message" in data["error"]
assert "type" in data["error"]
def test_content_type_json(self, rustberg_server: str):
response = httpx.get(f"{rustberg_server}/v1/namespaces")
assert response.status_code == 200
assert "application/json" in response.headers.get("content-type", "")
class TestConcurrency:
def test_concurrent_namespace_creation(self, catalog: RestCatalog):
import concurrent.futures
import uuid
def create_namespace(i: int) -> str:
ns = f"concurrent_ns_{uuid.uuid4().hex[:8]}"
catalog.create_namespace(ns)
return ns
created_namespaces = []
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(create_namespace, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
created_namespaces.append(future.result())
assert len(created_namespaces) == 5
all_namespaces = catalog.list_namespaces()
for ns in created_namespaces:
assert any(ns in str(n) for n in all_namespaces)
finally:
for ns in created_namespaces:
try:
catalog.drop_namespace(ns)
except Exception:
pass
def test_concurrent_table_creation(
self,
catalog: RestCatalog,
temp_namespace: str,
sample_schema,
):
import concurrent.futures
import uuid
def create_table(i: int) -> str:
table_name = f"concurrent_table_{uuid.uuid4().hex[:8]}"
catalog.create_table(
identifier=f"{temp_namespace}.{table_name}",
schema=sample_schema,
)
return table_name
created_tables = []
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(create_table, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
created_tables.append(future.result())
assert len(created_tables) == 5
all_tables = catalog.list_tables(temp_namespace)
for table in created_tables:
assert any(table in str(t) for t in all_tables)
finally:
pass