from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"requests is required for FHIRExporter. "
"Install with: pip install requests or pip install briefcase-ai[fhir]"
)
_AUDIT_EVENT_TYPE = {
"system": "http://dicom.nema.org/resources/ontology/DCM",
"code": "110100",
"display": "Application Activity",
}
_AGENT_TYPE_CODING = {
"system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
"code": "AUT",
"display": "Author",
}
OUTCOME_SUCCESS = "0"
OUTCOME_MINOR_FAILURE = "4"
OUTCOME_SERIOUS_FAILURE = "8"
OUTCOME_MAJOR_FAILURE = "12"
try:
import requests as _requests
_REQUESTS_AVAILABLE = True
except ImportError:
_requests = None _REQUESTS_AVAILABLE = False
from briefcase.exporters.base import BaseExporter
class FHIRExporter(BaseExporter):
def __init__(
self,
url: str,
token: Optional[str] = None,
batch_size: int = 100,
agent_name: str = "briefcase-ai",
verify_ssl: bool = True,
max_retries: int = 3,
timeout: int = 10,
) -> None:
if not _REQUESTS_AVAILABLE:
raise ImportError(_INSTALL_HINT)
self._base_url = url.rstrip("/")
self._token = token
self._batch_size = max(1, batch_size)
self._agent_name = agent_name
self._verify_ssl = verify_ssl
self._max_retries = max_retries
self._timeout = timeout
self._buffer: List[Dict[str, Any]] = []
async def export(self, decision: Any) -> bool:
try:
audit_event = self._build_audit_event(decision)
self._buffer.append(audit_event)
if len(self._buffer) >= self._batch_size:
return await self.flush()
return True
except Exception as e:
logger.debug("FHIRExporter: export error: %s", e)
return False
async def flush(self) -> bool:
if not self._buffer:
return True
events = list(self._buffer)
self._buffer.clear()
if len(events) == 1:
return self._post_resource(events[0], "/AuditEvent")
else:
bundle = self._build_bundle(events)
return self._post_resource(bundle, "/")
async def close(self) -> None:
await self.flush()
def _build_audit_event(self, decision: Any) -> Dict[str, Any]:
payload = self._decision_to_dict(decision)
decision_id = str(
payload.get("decision_id")
or payload.get("id")
or payload.get("trace_id")
or "unknown"
)
outcome = OUTCOME_SUCCESS
outcome_desc = "Success"
if payload.get("error") or payload.get("errors"):
outcome = OUTCOME_MINOR_FAILURE
outcome_desc = str(payload.get("error") or payload.get("errors"))
return {
"resourceType": "AuditEvent",
"type": {
"system": _AUDIT_EVENT_TYPE["system"],
"code": _AUDIT_EVENT_TYPE["code"],
"display": _AUDIT_EVENT_TYPE["display"],
},
"recorded": datetime.now(timezone.utc).isoformat(),
"outcome": outcome,
"outcomeDesc": outcome_desc,
"agent": [
{
"type": {
"coding": [_AGENT_TYPE_CODING],
},
"name": self._agent_name,
"requestor": True,
}
],
"source": {
"observer": {"display": self._agent_name},
},
"entity": [
{
"what": {"display": decision_id},
"detail": [
{
"type": "decision_record",
"valueString": json.dumps(payload, default=str),
}
],
}
],
}
@staticmethod
def _build_bundle(audit_events: List[Dict[str, Any]]) -> Dict[str, Any]:
return {
"resourceType": "Bundle",
"type": "batch",
"entry": [{"resource": evt} for evt in audit_events],
}
@staticmethod
def _decision_to_dict(decision: Any) -> Dict[str, Any]:
if isinstance(decision, dict):
return decision
try:
from dataclasses import asdict, fields
fields(decision)
return asdict(decision)
except TypeError:
pass
return getattr(decision, "__dict__", {"repr": repr(decision)})
def _post_resource(self, resource: Dict[str, Any], path: str) -> bool:
url = self._base_url + path
body = json.dumps(resource, default=str).encode("utf-8")
headers: Dict[str, str] = {"Content-Type": "application/fhir+json"}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
for attempt in range(self._max_retries + 1):
try:
resp = _requests.post(
url,
data=body,
headers=headers,
verify=self._verify_ssl,
timeout=self._timeout,
)
if resp.status_code < 500:
if resp.status_code >= 400:
logger.warning(
"FHIRExporter: server returned %s: %s",
resp.status_code,
resp.text[:200],
)
return False
return True
logger.debug(
"FHIRExporter: transient %s on attempt %d/%d",
resp.status_code,
attempt + 1,
self._max_retries + 1,
)
except Exception as exc:
logger.debug(
"FHIRExporter: network error on attempt %d/%d: %s",
attempt + 1,
self._max_retries + 1,
exc,
)
logger.warning(
"FHIRExporter: gave up after %d attempts", self._max_retries + 1
)
return False