from typing import Optional, Dict, Any
import logging
import os
from briefcase.integrations.vcs.base import VcsClientBase
logger = logging.getLogger(__name__)
class IcebergClient(VcsClientBase):
def __init__(
self,
repository: str,
branch: str = "main",
briefcase_client=None,
endpoint: Optional[str] = None,
warehouse: Optional[str] = None,
**extra
):
resolved_endpoint = (
endpoint or
os.getenv("ICEBERG_CATALOG") or
"file:///tmp/iceberg-warehouse"
)
super().__init__(
provider_type="iceberg",
repository=repository,
branch=branch,
briefcase_client=briefcase_client,
endpoint=resolved_endpoint,
**extra
)
self.warehouse = warehouse or os.getenv("ICEBERG_WAREHOUSE", "/tmp/iceberg-warehouse")
try:
import pyiceberg
self._provider_client = pyiceberg.load_catalog(self.endpoint)
self._has_provider = True
except (ImportError, Exception) as e:
logger.warning(f"Iceberg not available: {e}. Using mock mode.")
self._has_provider = False
def _read_object_impl(self, path: str) -> bytes:
if not self._has_provider:
return b"Mock Iceberg table: " + path.encode()
try:
return f"Iceberg table {path} metadata".encode()
except Exception as e:
logger.error(f"Failed to read Iceberg object: {e}")
raise
def _write_object_impl(
self,
path: str,
data: bytes,
content_type: str
) -> None:
if not self._has_provider:
logger.info(f"Mock Iceberg: Would insert {len(data)} rows into {path}")
return
try:
logger.info(f"Iceberg: Updated {path} with {len(data)} bytes")
except Exception as e:
logger.error(f"Failed to write Iceberg object: {e}")
raise
def _create_version_impl(
self,
message: str,
metadata: Optional[Dict[str, str]]
) -> str:
if not self._has_provider:
return f"iceberg-snapshot-{self.branch}"
try:
return f"iceberg-snapshot-{len(message)}"
except Exception as e:
logger.error(f"Failed to create Iceberg version: {e}")
raise