from typing import Optional
from contextlib import contextmanager
import logging
try:
from opentelemetry import trace
HAS_OTEL = True
except ImportError:
HAS_OTEL = False
from briefcase.integrations.lakefs.client import VersionedClient
logger = logging.getLogger(__name__)
class VersionedContextManager:
def __init__(
self,
briefcase_client,
repository: str,
branch: str = "main",
commit: str = "latest",
lakefs_endpoint: Optional[str] = None,
lakefs_access_key: Optional[str] = None,
lakefs_secret_key: Optional[str] = None,
require_live: bool = False,
):
self.briefcase_client = briefcase_client
self.repository = repository
self.branch = branch
self.commit = commit
if hasattr(briefcase_client, 'config'):
if lakefs_endpoint is None:
lakefs_endpoint = briefcase_client.config.get("lakefs_endpoint")
if lakefs_access_key is None:
lakefs_access_key = briefcase_client.config.get("lakefs_access_key")
if lakefs_secret_key is None:
lakefs_secret_key = briefcase_client.config.get("lakefs_secret_key")
self.lakefs_endpoint = lakefs_endpoint
self.lakefs_access_key = lakefs_access_key
self.lakefs_secret_key = lakefs_secret_key
self.require_live = require_live
self._lakefs_client = None
self._span = None
self._span_token = None
def __enter__(self):
if HAS_OTEL:
try:
tracer = trace.get_tracer(__name__)
self._span = tracer.start_as_current_span(
"lakefs.versioned_context",
attributes={
"lakefs.repository": self.repository,
"lakefs.branch": self.branch,
"lakefs.commit.requested": self.commit
}
)
self._span_token = self._span.__enter__()
except Exception as e:
logger.warning(f"Failed to create OTel span: {e}")
self._lakefs_client = VersionedClient(
repository=self.repository,
branch=self.branch,
commit=self.commit,
briefcase_client=self.briefcase_client,
lakefs_endpoint=self.lakefs_endpoint,
lakefs_access_key=self.lakefs_access_key,
lakefs_secret_key=self.lakefs_secret_key,
require_live=self.require_live,
)
return self._lakefs_client
def __exit__(self, exc_type, exc_val, exc_tb):
if self._span:
try:
self._span.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(f"Failed to exit OTel span: {e}")
return False
@contextmanager
def versioned_context(
briefcase_client,
repository: str,
branch: str = "main",
commit: str = "latest",
**kwargs
):
ctx = VersionedContextManager(
briefcase_client,
repository,
branch,
commit,
**kwargs
)
with ctx as versioned_client:
yield versioned_client
LakeFSContextManager = VersionedContextManager
lakefs_context = versioned_context