briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
FHIRExporter — ship decision records to a FHIR R4 server as AuditEvent resources.

Each decision becomes a FHIR ``AuditEvent`` (resourceType). When batch_size > 1
the exporter accumulates events and flushes them as a single FHIR ``Bundle``
(type: "batch"), reducing round-trips to the server.

Authentication supports Bearer tokens; unauthenticated servers are also fine.
The only optional dependency is ``requests`` for HTTP transport.

Usage:
    from briefcase.exporters.fhir import FHIRExporter

    exporter = FHIRExporter(
        url="https://fhir.hospital.org/R4",
        token="<bearer-token>",
        batch_size=20,
    )
    from briefcase.config import setup
    setup(exporter=exporter)

FHIR references:
    AuditEvent R4: https://hl7.org/fhir/R4/auditevent.html
    Bundle batch:  https://hl7.org/fhir/R4/bundle.html
"""

from __future__ import annotations

import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

_INSTALL_HINT = (
    "requests is required for FHIRExporter. "
    "Install with: pip install requests  or  pip install briefcase-ai[fhir]"
)

# ── FHIR coding constants ─────────────────────────────────────────────────────

_AUDIT_EVENT_TYPE = {
    "system": "http://dicom.nema.org/resources/ontology/DCM",
    "code": "110100",
    "display": "Application Activity",
}

_AGENT_TYPE_CODING = {
    "system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
    "code": "AUT",
    "display": "Author",
}

# FHIR AuditEvent outcome codes (R4)
OUTCOME_SUCCESS = "0"
OUTCOME_MINOR_FAILURE = "4"
OUTCOME_SERIOUS_FAILURE = "8"
OUTCOME_MAJOR_FAILURE = "12"

# ── Optional dependency guard ─────────────────────────────────────────────────

try:
    import requests as _requests
    _REQUESTS_AVAILABLE = True
except ImportError:
    _requests = None  # type: ignore[assignment]
    _REQUESTS_AVAILABLE = False


# ── Main exporter class ───────────────────────────────────────────────────────

from briefcase.exporters.base import BaseExporter


class FHIRExporter(BaseExporter):
    """Export Briefcase decision records as FHIR R4 AuditEvent resources.

    Args:
        url: Base URL of the FHIR server (e.g. "https://fhir.hospital.org/R4").
             ``/AuditEvent`` or ``/`` is appended as needed.
        token: Optional Bearer token for authentication.
        batch_size: Events to accumulate before auto-flushing. When > 1,
                    events are bundled into a FHIR Bundle (type: "batch").
                    Defaults to 100.
        agent_name: Value for the AuditEvent agent.name field. Defaults to
                    "briefcase-ai".
        verify_ssl: Whether to verify SSL certificates. Defaults to True.
        max_retries: Retries on transient 5xx / network errors. Defaults to 3.
        timeout: HTTP request timeout in seconds. Defaults to 10.
    """

    def __init__(
        self,
        url: str,
        token: Optional[str] = None,
        batch_size: int = 100,
        agent_name: str = "briefcase-ai",
        verify_ssl: bool = True,
        max_retries: int = 3,
        timeout: int = 10,
    ) -> None:
        if not _REQUESTS_AVAILABLE:
            raise ImportError(_INSTALL_HINT)

        self._base_url = url.rstrip("/")
        self._token = token
        self._batch_size = max(1, batch_size)
        self._agent_name = agent_name
        self._verify_ssl = verify_ssl
        self._max_retries = max_retries
        self._timeout = timeout

        self._buffer: List[Dict[str, Any]] = []

    # ── BaseExporter interface ────────────────────────────────────────────────

    async def export(self, decision: Any) -> bool:
        """Buffer a decision as an AuditEvent. Auto-flushes at batch_size."""
        try:
            audit_event = self._build_audit_event(decision)
            self._buffer.append(audit_event)
            if len(self._buffer) >= self._batch_size:
                return await self.flush()
            return True
        except Exception as e:
            logger.debug("FHIRExporter: export error: %s", e)
            return False

    async def flush(self) -> bool:
        """Send all buffered AuditEvents to the FHIR server."""
        if not self._buffer:
            return True
        events = list(self._buffer)
        self._buffer.clear()

        if len(events) == 1:
            return self._post_resource(events[0], "/AuditEvent")
        else:
            bundle = self._build_bundle(events)
            return self._post_resource(bundle, "/")

    async def close(self) -> None:
        """Flush remaining events and release resources."""
        await self.flush()

    # ── FHIR resource builders ────────────────────────────────────────────────

    def _build_audit_event(self, decision: Any) -> Dict[str, Any]:
        """Convert a decision record into a FHIR R4 AuditEvent resource."""
        payload = self._decision_to_dict(decision)

        decision_id = str(
            payload.get("decision_id")
            or payload.get("id")
            or payload.get("trace_id")
            or "unknown"
        )

        # Determine outcome from any error field
        outcome = OUTCOME_SUCCESS
        outcome_desc = "Success"
        if payload.get("error") or payload.get("errors"):
            outcome = OUTCOME_MINOR_FAILURE
            outcome_desc = str(payload.get("error") or payload.get("errors"))

        return {
            "resourceType": "AuditEvent",
            "type": {
                "system": _AUDIT_EVENT_TYPE["system"],
                "code": _AUDIT_EVENT_TYPE["code"],
                "display": _AUDIT_EVENT_TYPE["display"],
            },
            "recorded": datetime.now(timezone.utc).isoformat(),
            "outcome": outcome,
            "outcomeDesc": outcome_desc,
            "agent": [
                {
                    "type": {
                        "coding": [_AGENT_TYPE_CODING],
                    },
                    "name": self._agent_name,
                    "requestor": True,
                }
            ],
            "source": {
                "observer": {"display": self._agent_name},
            },
            "entity": [
                {
                    "what": {"display": decision_id},
                    "detail": [
                        {
                            "type": "decision_record",
                            "valueString": json.dumps(payload, default=str),
                        }
                    ],
                }
            ],
        }

    @staticmethod
    def _build_bundle(audit_events: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Wrap multiple AuditEvents into a FHIR Bundle of type 'batch'."""
        return {
            "resourceType": "Bundle",
            "type": "batch",
            "entry": [{"resource": evt} for evt in audit_events],
        }

    # ── Internal helpers ──────────────────────────────────────────────────────

    @staticmethod
    def _decision_to_dict(decision: Any) -> Dict[str, Any]:
        if isinstance(decision, dict):
            return decision
        try:
            from dataclasses import asdict, fields
            fields(decision)
            return asdict(decision)
        except TypeError:
            pass
        return getattr(decision, "__dict__", {"repr": repr(decision)})

    def _post_resource(self, resource: Dict[str, Any], path: str) -> bool:
        """POST a FHIR resource to ``self._base_url + path`` with retries."""
        url = self._base_url + path
        body = json.dumps(resource, default=str).encode("utf-8")
        headers: Dict[str, str] = {"Content-Type": "application/fhir+json"}
        if self._token:
            headers["Authorization"] = f"Bearer {self._token}"

        for attempt in range(self._max_retries + 1):
            try:
                resp = _requests.post(
                    url,
                    data=body,
                    headers=headers,
                    verify=self._verify_ssl,
                    timeout=self._timeout,
                )
                if resp.status_code < 500:
                    if resp.status_code >= 400:
                        logger.warning(
                            "FHIRExporter: server returned %s: %s",
                            resp.status_code,
                            resp.text[:200],
                        )
                        return False
                    return True
                logger.debug(
                    "FHIRExporter: transient %s on attempt %d/%d",
                    resp.status_code,
                    attempt + 1,
                    self._max_retries + 1,
                )
            except Exception as exc:
                logger.debug(
                    "FHIRExporter: network error on attempt %d/%d: %s",
                    attempt + 1,
                    self._max_retries + 1,
                    exc,
                )

        logger.warning(
            "FHIRExporter: gave up after %d attempts", self._max_retries + 1
        )
        return False