from __future__ import annotations
import hashlib
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from briefcase.storage.base import PythonStorageBackend, StorageReceipt
try:
import boto3 as boto3
import botocore.exceptions as _botocore_exc
except ImportError:
boto3 = None _botocore_exc = None
class S3ObjectLockStorage(PythonStorageBackend):
def __init__(
self,
bucket: str,
region: str = "us-east-1",
retention_days: int = 2555,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
endpoint_url: Optional[str] = None,
) -> None:
if boto3 is None:
raise ImportError(
"boto3 is required for S3ObjectLockStorage. "
"Install it with: pip install 'briefcase-ai[aws]'"
)
self._bucket = bucket
self._region = region
self._retention_days = retention_days
kwargs: dict = {"region_name": region}
if access_key and secret_key:
kwargs["aws_access_key_id"] = access_key
kwargs["aws_secret_access_key"] = secret_key
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
self._s3 = boto3.client("s3", **kwargs)
@staticmethod
def _sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _retain_until(self) -> str:
until = datetime.now(timezone.utc) + timedelta(days=self._retention_days)
return until.strftime("%Y-%m-%dT%H:%M:%S.000Z")
async def write(self, key: str, record: bytes) -> StorageReceipt:
digest = self._sha256(record)
retain_until = self._retain_until()
self._s3.put_object(
Bucket=self._bucket,
Key=key,
Body=record,
ObjectLockMode="GOVERNANCE",
ObjectLockRetainUntilDate=retain_until,
)
return StorageReceipt(
key=key,
hash=digest,
timestamp=datetime.now(timezone.utc),
backend_metadata={
"bucket": self._bucket,
"region": self._region,
"retain_until": retain_until,
},
)
async def read(self, key: str) -> bytes:
response = self._s3.get_object(Bucket=self._bucket, Key=key)
return response["Body"].read()
async def verify(self, key: str, receipt: StorageReceipt) -> bool:
try:
data = await self.read(key)
return self._sha256(data) == receipt.hash
except Exception:
return False
async def list_keys(
self, prefix: str, start: datetime, end: datetime
) -> List[str]:
paginator = self._s3.get_paginator("list_objects_v2")
keys: List[str] = []
for page in paginator.paginate(Bucket=self._bucket, Prefix=prefix):
for obj in page.get("Contents", []):
last_modified: datetime = obj["LastModified"]
if not start.tzinfo:
start = start.replace(tzinfo=timezone.utc)
if not end.tzinfo:
end = end.replace(tzinfo=timezone.utc)
if start <= last_modified <= end:
keys.append(obj["Key"])
return keys