briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
SplunkHECExporter — ship decision records to Splunk via HTTP Event Collector.

Buffers events and POSTs them in batches to the HEC endpoint using the
`requests` library (optional dependency).

Usage:
    from briefcase.exporters.splunk import SplunkHECExporter

    exporter = SplunkHECExporter(
        url="https://splunk.corp:8088",
        token="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        index="ai_decisions",
        sourcetype="briefcase_ai",
        batch_size=50,
    )
    # Wire into BriefcaseConfig:
    from briefcase.config import setup
    setup(exporter=exporter)
"""

from __future__ import annotations

import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

_INSTALL_HINT = (
    "requests is required for SplunkHECExporter. "
    "Install with: pip install requests  or  pip install briefcase-ai[splunk]"
)

# ── Optional dependency guard ─────────────────────────────────────────────────

try:
    import requests as _requests
    _REQUESTS_AVAILABLE = True
except ImportError:
    _requests = None  # type: ignore[assignment]
    _REQUESTS_AVAILABLE = False


# ── Main exporter class ───────────────────────────────────────────────────────

from briefcase.exporters.base import BaseExporter


class SplunkHECExporter(BaseExporter):
    """Export Briefcase decision records to Splunk via HEC.

    Args:
        url: Base URL of the Splunk instance (e.g. "https://splunk.corp:8088").
        token: HEC authentication token.
        index: Splunk index to write events into. Defaults to "main".
        sourcetype: Splunk sourcetype. Defaults to "briefcase_ai".
        batch_size: Number of events to accumulate before auto-flushing.
                    Set to 1 to disable batching (flush on every export).
        verify_ssl: Whether to verify SSL certificates. Defaults to True.
        max_retries: Number of retries on transient HTTP errors (5xx / network).
        timeout: HTTP request timeout in seconds. Defaults to 10.
    """

    _HEC_PATH = "/services/collector/event"

    def __init__(
        self,
        url: str,
        token: str,
        index: str = "main",
        sourcetype: str = "briefcase_ai",
        batch_size: int = 100,
        verify_ssl: bool = True,
        max_retries: int = 3,
        timeout: int = 10,
    ) -> None:
        if not _REQUESTS_AVAILABLE:
            raise ImportError(_INSTALL_HINT)

        self._url = url.rstrip("/") + self._HEC_PATH
        self._token = token
        self._index = index
        self._sourcetype = sourcetype
        self._batch_size = max(1, batch_size)
        self._verify_ssl = verify_ssl
        self._max_retries = max_retries
        self._timeout = timeout

        self._buffer: List[Dict[str, Any]] = []

    # ── BaseExporter interface ────────────────────────────────────────────────

    async def export(self, decision: Any) -> bool:
        """Buffer a decision record. Auto-flushes when batch_size is reached."""
        try:
            event = self._build_event(decision)
            self._buffer.append(event)
            if len(self._buffer) >= self._batch_size:
                return await self.flush()
            return True
        except Exception as e:
            logger.debug("SplunkHECExporter: export error: %s", e)
            return False

    async def flush(self) -> bool:
        """Send all buffered events to Splunk HEC. Returns True on success."""
        if not self._buffer:
            return True
        events = list(self._buffer)
        self._buffer.clear()
        return self._send(events)

    async def close(self) -> None:
        """Flush remaining events and release resources."""
        await self.flush()

    # ── Internal helpers ──────────────────────────────────────────────────────

    def _build_event(self, decision: Any) -> Dict[str, Any]:
        """Build a HEC event envelope from a decision record."""
        if isinstance(decision, dict):
            payload = decision
        else:
            try:
                from dataclasses import asdict, fields
                fields(decision)
                payload = asdict(decision)
            except TypeError:
                payload = getattr(decision, "__dict__", {"repr": repr(decision)})

        return {
            "time": datetime.now(timezone.utc).timestamp(),
            "index": self._index,
            "sourcetype": self._sourcetype,
            "event": payload,
        }

    def _send(self, events: List[Dict[str, Any]]) -> bool:
        """POST *events* to HEC, retrying on transient failures."""
        # HEC accepts newline-delimited JSON events in a single POST body
        body = "\n".join(json.dumps(e, default=str) for e in events)
        headers = {
            "Authorization": f"Splunk {self._token}",
            "Content-Type": "application/json",
        }

        for attempt in range(self._max_retries + 1):
            try:
                resp = _requests.post(
                    self._url,
                    data=body,
                    headers=headers,
                    verify=self._verify_ssl,
                    timeout=self._timeout,
                )
                if resp.status_code < 500:
                    # 2xx = success, 4xx = client error (no retry)
                    if resp.status_code >= 400:
                        logger.warning(
                            "SplunkHECExporter: HEC returned %s: %s",
                            resp.status_code,
                            resp.text[:200],
                        )
                        return False
                    return True
                # 5xx — transient, retry
                logger.debug(
                    "SplunkHECExporter: transient %s on attempt %d/%d",
                    resp.status_code,
                    attempt + 1,
                    self._max_retries + 1,
                )
            except Exception as exc:
                logger.debug(
                    "SplunkHECExporter: network error on attempt %d/%d: %s",
                    attempt + 1,
                    self._max_retries + 1,
                    exc,
                )

        logger.warning(
            "SplunkHECExporter: gave up after %d attempts", self._max_retries + 1
        )
        return False