briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
"""
Base class for VCS client implementations.

Provides common functionality for all VCS providers including:
- Object read/write operations
- Version creation and tracking
- Metadata capture
- Context manager support
- OpenTelemetry instrumentation
"""

from typing import Optional, Dict, Any, Tuple, Union
from datetime import datetime
import logging
import os

try:
    from opentelemetry import trace
    HAS_OTEL = True
except ImportError:
    HAS_OTEL = False

logger = logging.getLogger(__name__)


class VcsClientBase:
    """
    Base class for all VCS client implementations.

    Provides common interface for reading, writing, and versioning objects
    across different VCS providers. Supports context manager protocol and
    OpenTelemetry instrumentation.

    Configuration priority (highest to lowest):
        1. Explicit parameters
        2. Briefcase client configuration
        3. Environment variables
        4. Provider defaults

    Attributes:
        provider_type: Type of VCS provider (e.g., "dvc", "nessie", "pachyderm")
        briefcase_client: Optional BriefcaseClient for instrumentation
        endpoint: API endpoint for VCS provider
        repository: Repository or bucket name
        branch: Branch or version name
        version: Current version identifier (SHA, tag, etc.)
    """

    def __init__(
        self,
        provider_type: str,
        repository: str,
        branch: str = "main",
        briefcase_client=None,
        endpoint: Optional[str] = None,
        access_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        token: Optional[str] = None,
        **extra
    ):
        """
        Initialize VCS client with provider-specific configuration.

        Args:
            provider_type: Type of VCS provider
            repository: Repository/bucket name
            branch: Branch or version name (default: "main")
            briefcase_client: Optional BriefcaseClient for instrumentation
            endpoint: API endpoint (env var or default)
            access_key: Access key for authentication
            secret_key: Secret key for authentication
            token: Bearer token for authentication
            **extra: Provider-specific configuration options
        """
        self.provider_type = provider_type
        self.repository = repository
        self.branch = branch
        self.briefcase_client = briefcase_client
        self.endpoint = endpoint
        self.access_key = access_key
        self.secret_key = secret_key
        self.token = token
        self.extra = extra

        # Version tracking
        self.version = None
        self._version_metadata = {}

        # Initialize provider-specific client (to be implemented by subclasses)
        self._provider_client = None
        self._has_provider = False

    def read_object(
        self,
        path: str,
        return_metadata: bool = False
    ) -> Union[bytes, Tuple[bytes, Dict]]:
        """
        Read an object from VCS with automatic instrumentation.

        Args:
            path: Object path (e.g., "data/file.csv")
            return_metadata: If True, return (content, metadata) tuple

        Returns:
            Object content as bytes, optionally with metadata dict
        """
        start_time = datetime.now()
        content = b""
        metadata = {
            "path": path,
            "provider": self.provider_type,
            "repository": self.repository,
            "branch": self.branch,
            "version": self.version,
        }

        try:
            # Provider-specific implementation
            content = self._read_object_impl(path)
            metadata["size"] = len(content)
            metadata["status"] = "success"
        except Exception as e:
            logger.error(f"Failed to read {path} from {self.provider_type}: {e}")
            metadata["status"] = "error"
            metadata["error"] = str(e)

        # Record access if instrumentation available
        if self.briefcase_client:
            self._record_access(path, metadata, start_time)

        if return_metadata:
            return content, metadata
        return content

    def write_object(
        self,
        path: str,
        data: bytes,
        content_type: str = "application/octet-stream"
    ) -> Dict[str, Any]:
        """
        Write an object to VCS with automatic instrumentation.

        Args:
            path: Object path
            data: Object content as bytes
            content_type: Content type (default: "application/octet-stream")

        Returns:
            Metadata dict with write result
        """
        start_time = datetime.now()
        metadata = {
            "path": path,
            "provider": self.provider_type,
            "repository": self.repository,
            "branch": self.branch,
            "size": len(data),
        }

        try:
            # Provider-specific implementation
            self._write_object_impl(path, data, content_type)
            metadata["status"] = "success"
        except Exception as e:
            logger.error(f"Failed to write {path} to {self.provider_type}: {e}")
            metadata["status"] = "error"
            metadata["error"] = str(e)

        # Record write if instrumentation available
        if self.briefcase_client:
            self._record_write(path, metadata, start_time)

        return metadata

    def create_version(
        self,
        message: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """
        Create a new version (commit, tag, branch point) in VCS.

        Args:
            message: Version message/commit message
            metadata: Optional metadata to attach to version

        Returns:
            Version metadata dict with identifier
        """
        start_time = datetime.now()
        version_metadata = {
            "provider": self.provider_type,
            "repository": self.repository,
            "branch": self.branch,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {}
        }

        try:
            # Provider-specific implementation
            version_id = self._create_version_impl(message, metadata)
            version_metadata["version_id"] = version_id
            version_metadata["status"] = "success"
            self.version = version_id
            self._version_metadata = version_metadata
        except Exception as e:
            logger.error(f"Failed to create version in {self.provider_type}: {e}")
            version_metadata["status"] = "error"
            version_metadata["error"] = str(e)

        # Record version creation
        if self.briefcase_client:
            self._record_version_creation(version_metadata, start_time)

        return version_metadata

    def capture_metadata(self) -> Dict[str, Any]:
        """
        Capture current VCS metadata snapshot.

        Returns:
            Dictionary containing:
            - provider_type: VCS provider type
            - endpoint: API endpoint
            - repository: Repository name
            - branch: Branch name
            - version: Current version identifier
            - timestamp: Capture timestamp
            - version_metadata: Full version metadata if available
        """
        return {
            "provider_type": self.provider_type,
            "endpoint": self.endpoint,
            "repository": self.repository,
            "branch": self.branch,
            "version": self.version,
            "timestamp": datetime.now().isoformat(),
            "version_metadata": self._version_metadata,
        }

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.close()
        return False

    def close(self):
        """Close provider-specific client connection."""
        try:
            if self._provider_client and hasattr(self._provider_client, 'close'):
                self._provider_client.close()
        except Exception as e:
            logger.warning(f"Failed to close {self.provider_type} client: {e}")

    # Protected methods for subclasses to override

    def _read_object_impl(self, path: str) -> bytes:
        """
        Provider-specific object read implementation.

        Must be overridden by subclasses.

        Args:
            path: Object path

        Returns:
            Object content as bytes
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement _read_object_impl")

    def _write_object_impl(
        self,
        path: str,
        data: bytes,
        content_type: str
    ) -> None:
        """
        Provider-specific object write implementation.

        Must be overridden by subclasses.

        Args:
            path: Object path
            data: Object content as bytes
            content_type: Content type
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement _write_object_impl")

    def _create_version_impl(
        self,
        message: str,
        metadata: Optional[Dict[str, str]]
    ) -> str:
        """
        Provider-specific version creation implementation.

        Must be overridden by subclasses.

        Args:
            message: Version message
            metadata: Version metadata

        Returns:
            Version identifier (SHA, tag, etc.)
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement _create_version_impl")

    # Instrumentation methods

    def _record_access(
        self,
        path: str,
        metadata: Dict[str, Any],
        start_time: datetime
    ) -> None:
        """Record object access in OpenTelemetry trace."""
        if not HAS_OTEL:
            return

        try:
            current_span = trace.get_current_span()
            if not current_span or not current_span.is_recording():
                return

            # Set provider-level attributes
            current_span.set_attribute("vcs.provider", self.provider_type)
            current_span.set_attribute("vcs.repository", self.repository)
            current_span.set_attribute("vcs.branch", self.branch)
            if self.version:
                current_span.set_attribute("vcs.version", self.version)

            # Record file access event
            current_span.add_event(
                f"{self.provider_type}.file_accessed",
                attributes={
                    "vcs.file_path": path,
                    "vcs.file_size": metadata.get("size", 0),
                    "duration_ms": (datetime.now() - start_time).total_seconds() * 1000
                }
            )
        except Exception as e:
            logger.warning(f"Failed to record access instrumentation: {e}")

    def _record_write(
        self,
        path: str,
        metadata: Dict[str, Any],
        start_time: datetime
    ) -> None:
        """Record object write in OpenTelemetry trace."""
        if not HAS_OTEL:
            return

        try:
            current_span = trace.get_current_span()
            if not current_span or not current_span.is_recording():
                return

            # Set provider-level attributes
            current_span.set_attribute("vcs.provider", self.provider_type)
            current_span.set_attribute("vcs.repository", self.repository)
            current_span.set_attribute("vcs.branch", self.branch)

            # Record write event
            current_span.add_event(
                f"{self.provider_type}.file_written",
                attributes={
                    "vcs.file_path": path,
                    "vcs.file_size": metadata.get("size", 0),
                    "vcs.status": metadata.get("status", "unknown"),
                    "duration_ms": (datetime.now() - start_time).total_seconds() * 1000
                }
            )
        except Exception as e:
            logger.warning(f"Failed to record write instrumentation: {e}")

    def _record_version_creation(
        self,
        metadata: Dict[str, Any],
        start_time: datetime
    ) -> None:
        """Record version creation in OpenTelemetry trace."""
        if not HAS_OTEL:
            return

        try:
            current_span = trace.get_current_span()
            if not current_span or not current_span.is_recording():
                return

            # Set version attributes
            current_span.set_attribute("vcs.provider", self.provider_type)
            if metadata.get("version_id"):
                current_span.set_attribute("vcs.version_id", metadata["version_id"])

            # Record version creation event
            current_span.add_event(
                f"{self.provider_type}.version_created",
                attributes={
                    "vcs.message": metadata.get("message", ""),
                    "vcs.status": metadata.get("status", "unknown"),
                    "duration_ms": (datetime.now() - start_time).total_seconds() * 1000
                }
            )
        except Exception as e:
            logger.warning(f"Failed to record version creation instrumentation: {e}")