from typing import Optional, Dict, Any, Tuple, Union
from datetime import datetime
import logging
import os
try:
from opentelemetry import trace
HAS_OTEL = True
except ImportError:
HAS_OTEL = False
logger = logging.getLogger(__name__)
class VcsClientBase:
def __init__(
self,
provider_type: str,
repository: str,
branch: str = "main",
briefcase_client=None,
endpoint: Optional[str] = None,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
token: Optional[str] = None,
**extra
):
self.provider_type = provider_type
self.repository = repository
self.branch = branch
self.briefcase_client = briefcase_client
self.endpoint = endpoint
self.access_key = access_key
self.secret_key = secret_key
self.token = token
self.extra = extra
self.version = None
self._version_metadata = {}
self._provider_client = None
self._has_provider = False
def read_object(
self,
path: str,
return_metadata: bool = False
) -> Union[bytes, Tuple[bytes, Dict]]:
start_time = datetime.now()
content = b""
metadata = {
"path": path,
"provider": self.provider_type,
"repository": self.repository,
"branch": self.branch,
"version": self.version,
}
try:
content = self._read_object_impl(path)
metadata["size"] = len(content)
metadata["status"] = "success"
except Exception as e:
logger.error(f"Failed to read {path} from {self.provider_type}: {e}")
metadata["status"] = "error"
metadata["error"] = str(e)
if self.briefcase_client:
self._record_access(path, metadata, start_time)
if return_metadata:
return content, metadata
return content
def write_object(
self,
path: str,
data: bytes,
content_type: str = "application/octet-stream"
) -> Dict[str, Any]:
start_time = datetime.now()
metadata = {
"path": path,
"provider": self.provider_type,
"repository": self.repository,
"branch": self.branch,
"size": len(data),
}
try:
self._write_object_impl(path, data, content_type)
metadata["status"] = "success"
except Exception as e:
logger.error(f"Failed to write {path} to {self.provider_type}: {e}")
metadata["status"] = "error"
metadata["error"] = str(e)
if self.briefcase_client:
self._record_write(path, metadata, start_time)
return metadata
def create_version(
self,
message: str,
metadata: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
start_time = datetime.now()
version_metadata = {
"provider": self.provider_type,
"repository": self.repository,
"branch": self.branch,
"message": message,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
try:
version_id = self._create_version_impl(message, metadata)
version_metadata["version_id"] = version_id
version_metadata["status"] = "success"
self.version = version_id
self._version_metadata = version_metadata
except Exception as e:
logger.error(f"Failed to create version in {self.provider_type}: {e}")
version_metadata["status"] = "error"
version_metadata["error"] = str(e)
if self.briefcase_client:
self._record_version_creation(version_metadata, start_time)
return version_metadata
def capture_metadata(self) -> Dict[str, Any]:
return {
"provider_type": self.provider_type,
"endpoint": self.endpoint,
"repository": self.repository,
"branch": self.branch,
"version": self.version,
"timestamp": datetime.now().isoformat(),
"version_metadata": self._version_metadata,
}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def close(self):
try:
if self._provider_client and hasattr(self._provider_client, 'close'):
self._provider_client.close()
except Exception as e:
logger.warning(f"Failed to close {self.provider_type} client: {e}")
def _read_object_impl(self, path: str) -> bytes:
raise NotImplementedError(f"{self.__class__.__name__} must implement _read_object_impl")
def _write_object_impl(
self,
path: str,
data: bytes,
content_type: str
) -> None:
raise NotImplementedError(f"{self.__class__.__name__} must implement _write_object_impl")
def _create_version_impl(
self,
message: str,
metadata: Optional[Dict[str, str]]
) -> str:
raise NotImplementedError(f"{self.__class__.__name__} must implement _create_version_impl")
def _record_access(
self,
path: str,
metadata: Dict[str, Any],
start_time: datetime
) -> None:
if not HAS_OTEL:
return
try:
current_span = trace.get_current_span()
if not current_span or not current_span.is_recording():
return
current_span.set_attribute("vcs.provider", self.provider_type)
current_span.set_attribute("vcs.repository", self.repository)
current_span.set_attribute("vcs.branch", self.branch)
if self.version:
current_span.set_attribute("vcs.version", self.version)
current_span.add_event(
f"{self.provider_type}.file_accessed",
attributes={
"vcs.file_path": path,
"vcs.file_size": metadata.get("size", 0),
"duration_ms": (datetime.now() - start_time).total_seconds() * 1000
}
)
except Exception as e:
logger.warning(f"Failed to record access instrumentation: {e}")
def _record_write(
self,
path: str,
metadata: Dict[str, Any],
start_time: datetime
) -> None:
if not HAS_OTEL:
return
try:
current_span = trace.get_current_span()
if not current_span or not current_span.is_recording():
return
current_span.set_attribute("vcs.provider", self.provider_type)
current_span.set_attribute("vcs.repository", self.repository)
current_span.set_attribute("vcs.branch", self.branch)
current_span.add_event(
f"{self.provider_type}.file_written",
attributes={
"vcs.file_path": path,
"vcs.file_size": metadata.get("size", 0),
"vcs.status": metadata.get("status", "unknown"),
"duration_ms": (datetime.now() - start_time).total_seconds() * 1000
}
)
except Exception as e:
logger.warning(f"Failed to record write instrumentation: {e}")
def _record_version_creation(
self,
metadata: Dict[str, Any],
start_time: datetime
) -> None:
if not HAS_OTEL:
return
try:
current_span = trace.get_current_span()
if not current_span or not current_span.is_recording():
return
current_span.set_attribute("vcs.provider", self.provider_type)
if metadata.get("version_id"):
current_span.set_attribute("vcs.version_id", metadata["version_id"])
current_span.add_event(
f"{self.provider_type}.version_created",
attributes={
"vcs.message": metadata.get("message", ""),
"vcs.status": metadata.get("status", "unknown"),
"duration_ms": (datetime.now() - start_time).total_seconds() * 1000
}
)
except Exception as e:
logger.warning(f"Failed to record version creation instrumentation: {e}")