briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
GxPExporter — export decision records enriched with 21 CFR Part 11 fields.

Part 11 fields added to every record:
  electronic_signature: {signer_id, timestamp, meaning}
  reason_for_change: str (required when context_version changes)
  record_hash: SHA-256 of the complete enriched record (excluding record_hash itself)
  prior_record_hash: hash of the preceding record (None for the first)
  system_validation_id: IQ/OQ/PQ reference

Hash chain: sequential linking creates a tamper-evident audit trail.
verify_chain(records) classmethod validates the chain integrity.
"""

from __future__ import annotations

import csv
import hashlib
import io
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

from briefcase.exporters.base import BaseExporter
from briefcase.config import BriefcaseConfig


class GxPExporter(BaseExporter):
    """Export decisions as Part 11-compliant records with a hash chain.

    Args:
        signer_id: Electronic signature identity string.
        system_validation_id: Validation record ID (IQ/OQ/PQ reference).
        format: Output format — "json" (newline-delimited) or "csv".
        meaning: Signature meaning — "production", "review", or "approval".
        reason_for_change: Default reason for change (can be per-call).
    """

    def __init__(
        self,
        signer_id: str,
        system_validation_id: str,
        format: str = "json",
        meaning: str = "production",
        reason_for_change: str = "",
    ) -> None:
        self._signer_id = signer_id
        self._validation_id = system_validation_id
        self._format = format
        self._meaning = meaning
        self._default_reason = reason_for_change
        self._prior_hash: Optional[str] = None
        self._last_context_version: Optional[str] = None
        self._buffer: List[dict] = []

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _sha256(data: str) -> str:
        return hashlib.sha256(data.encode()).hexdigest()

    @staticmethod
    def _extract_context_version(decision: Any) -> str:
        ctx = getattr(decision, "context", None)
        if ctx is not None:
            return str(getattr(ctx, "version", "") or "")
        return str(getattr(decision, "context_version", "") or "")

    @staticmethod
    def _decision_to_dict(decision: Any) -> dict:
        """Convert a decision to a plain dict for hashing/serialisation."""
        if isinstance(decision, dict):
            return decision
        try:
            from dataclasses import asdict, fields
            fields(decision)  # raises if not a dataclass
            return asdict(decision)
        except TypeError:
            pass
        # Fallback: __dict__ or repr
        return getattr(decision, "__dict__", {"repr": repr(decision)})

    def _build_record(
        self,
        decision: Any,
        reason_for_change: str,
    ) -> dict:
        """Build a Part 11-enriched record dict."""
        base = self._decision_to_dict(decision)
        context_version = self._extract_context_version(decision)

        # Determine reason_for_change
        if not reason_for_change:
            reason_for_change = self._default_reason
        if (
            self._last_context_version is not None
            and context_version != self._last_context_version
            and not reason_for_change
        ):
            reason_for_change = (
                f"context_version changed from "
                f"{self._last_context_version!r} to {context_version!r}"
            )

        # Build record without record_hash
        record = {
            **base,
            "electronic_signature": {
                "signer_id": self._signer_id,
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "meaning": self._meaning,
            },
            "reason_for_change": reason_for_change,
            "prior_record_hash": self._prior_hash,
            "system_validation_id": self._validation_id,
        }

        # Compute record_hash (excludes record_hash itself)
        record_hash = self._sha256(json.dumps(record, sort_keys=True, default=str))
        record["record_hash"] = record_hash

        # Advance chain
        self._prior_hash = record_hash
        self._last_context_version = context_version

        return record

    # ------------------------------------------------------------------
    # BaseExporter interface
    # ------------------------------------------------------------------

    async def export(
        self,
        decision: Any,
        reason_for_change: str = "",
    ) -> bool:
        """Enrich *decision* with Part 11 fields and buffer for output."""
        try:
            record = self._build_record(decision, reason_for_change)
            self._buffer.append(record)

            # Persist to storage backend if configured
            config = BriefcaseConfig.get()
            if config.storage is not None:
                key = f"gxp/{record.get('decision_id', id(record))}"
                data = json.dumps(record, default=str).encode()
                await config.storage.write(key, data)

            return True
        except Exception:
            return False

    async def flush(self) -> None:
        """No-op: records are buffered in memory."""

    async def close(self) -> None:
        """No-op: no external resources."""

    # ------------------------------------------------------------------
    # Output serialisation
    # ------------------------------------------------------------------

    def get_output(self) -> str:
        """Return buffered records in the configured format."""
        if self._format == "json":
            return "\n".join(
                json.dumps(r, default=str) for r in self._buffer
            )
        elif self._format == "csv":
            return self._to_csv()
        raise ValueError(f"Unknown format: {self._format!r}")

    def _to_csv(self) -> str:
        if not self._buffer:
            return ""
        # Flatten nested dicts for CSV
        flat_records = [self._flatten(r) for r in self._buffer]
        headers = list(flat_records[0].keys())
        out = io.StringIO()
        writer = csv.DictWriter(out, fieldnames=headers, extrasaction="ignore")
        writer.writeheader()
        writer.writerows(flat_records)
        return out.getvalue()

    @staticmethod
    def _flatten(record: dict, prefix: str = "") -> dict:
        """Flatten nested dicts with dot-notation keys."""
        result = {}
        for k, v in record.items():
            full_key = f"{prefix}{k}" if prefix else k
            if isinstance(v, dict):
                result.update(GxPExporter._flatten(v, f"{full_key}."))
            else:
                result[full_key] = v
        return result

    # ------------------------------------------------------------------
    # Chain verification
    # ------------------------------------------------------------------

    @classmethod
    def verify_chain(cls, records: List[dict]) -> bool:
        """Return True iff the hash chain in *records* is intact.

        Validates that:
        - Each record's record_hash matches a SHA-256 of the record without record_hash
        - Each record's prior_record_hash matches the previous record's record_hash
        """
        prior_hash: Optional[str] = None

        for record in records:
            stored_hash = record.get("record_hash")
            if stored_hash is None:
                return False

            # Re-compute hash from record without record_hash
            record_without_hash = {k: v for k, v in record.items() if k != "record_hash"}
            expected_hash = cls._sha256(
                json.dumps(record_without_hash, sort_keys=True, default=str)
            )
            if stored_hash != expected_hash:
                return False

            if record.get("prior_record_hash") != prior_hash:
                return False

            prior_hash = stored_hash

        return True