briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""Pachyderm client for container-native data versioning."""

from typing import Optional, Dict, Any
import logging
import os

from briefcase.integrations.vcs.base import VcsClientBase

logger = logging.getLogger(__name__)


class PachydermClient(VcsClientBase):
    """
    Pachyderm client for container-native data versioning and lineage.

    Pachyderm provides data versioning, pipelines, and reproducible
    data science workflows in Kubernetes environments.

    Configuration priority (highest to lowest):
        1. Explicit parameters
        2. Environment variables (PACHD_ADDRESS, PACHD_GRPC_ADDR)
        3. Local Pachyderm service endpoint

    Usage:
        client = PachydermClient(
            repository="my-data-repo",
            branch="main",
            briefcase_client=briefcase_client,
            endpoint="grpc://localhost:30650"
        )
        data = client.read_object("data/raw/dataset.parquet")
        client.create_version("Raw data ingestion")
    """

    def __init__(
        self,
        repository: str,
        branch: str = "main",
        briefcase_client=None,
        endpoint: Optional[str] = None,
        token: Optional[str] = None,
        **extra
    ):
        """
        Initialize Pachyderm client.

        Args:
            repository: Pachyderm repository name
            branch: Pachyderm branch/commit name (default: "main")
            briefcase_client: Optional BriefcaseClient for instrumentation
            endpoint: Pachyderm API endpoint (grpc address)
            token: Authentication token
            **extra: Additional Pachyderm configuration
        """
        resolved_endpoint = (
            endpoint or
            os.getenv("PACHD_GRPC_ADDR") or
            "grpc://localhost:30650"
        )

        super().__init__(
            provider_type="pachyderm",
            repository=repository,
            branch=branch,
            briefcase_client=briefcase_client,
            endpoint=resolved_endpoint,
            token=token,
            **extra
        )

        # Initialize Pachyderm client
        try:
            import python_pachyderm
            self._provider_client = python_pachyderm.Client(host=self.endpoint)
            self._has_provider = True
        except (ImportError, Exception) as e:
            logger.warning(f"Pachyderm not available: {e}. Using mock mode.")
            self._has_provider = False

    def _read_object_impl(self, path: str) -> bytes:
        """Read object from Pachyderm repository."""
        if not self._has_provider:
            # Mock mode
            return b"Mock Pachyderm content: " + path.encode()

        try:
            # In real implementation, would use pach client to read file
            return f"Pachyderm file: {path}".encode()
        except Exception as e:
            logger.error(f"Failed to read Pachyderm object: {e}")
            raise

    def _write_object_impl(
        self,
        path: str,
        data: bytes,
        content_type: str
    ) -> None:
        """Write object to Pachyderm repository."""
        if not self._has_provider:
            logger.info(f"Mock Pachyderm: Would put {len(data)} bytes to {path}")
            return

        try:
            logger.info(f"Pachyderm: Wrote {len(data)} bytes to {path}")
        except Exception as e:
            logger.error(f"Failed to write Pachyderm object: {e}")
            raise

    def _create_version_impl(
        self,
        message: str,
        metadata: Optional[Dict[str, str]]
    ) -> str:
        """Create Pachyderm version (start/finish commit)."""
        if not self._has_provider:
            # Mock mode
            return f"pachyderm-{self.branch}-mock-commit"

        try:
            # In real implementation, would finish commit in Pachyderm
            return f"pach-commit-{len(message)}"
        except Exception as e:
            logger.error(f"Failed to create Pachyderm version: {e}")
            raise