briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
SentinelConnector — ship decision records to Microsoft Sentinel via the
Azure Monitor HTTP Data Collector API (Log Analytics workspace).

Authentication uses HMAC-SHA256 SharedKey signing — no Azure SDK dependency.
The only optional dependency is ``requests`` for HTTP transport.

Usage:
    from briefcase.exporters.sentinel import SentinelConnector

    exporter = SentinelConnector(
        workspace_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        shared_key="<base64-encoded primary/secondary key>",
        log_type="BriefcaseDecisions",
        batch_size=50,
    )
    from briefcase.config import setup
    setup(exporter=exporter)
"""

from __future__ import annotations

import base64
import hashlib
import hmac
import json
import logging
from datetime import datetime, timezone
from email.utils import formatdate
from typing import Any, Dict, List

logger = logging.getLogger(__name__)

_INSTALL_HINT = (
    "requests is required for SentinelConnector. "
    "Install with: pip install requests  or  pip install briefcase-ai[sentinel]"
)

# ── Optional dependency guard ─────────────────────────────────────────────────

try:
    import requests as _requests
    _REQUESTS_AVAILABLE = True
except ImportError:
    _requests = None  # type: ignore[assignment]
    _REQUESTS_AVAILABLE = False


# ── Main exporter class ───────────────────────────────────────────────────────

from briefcase.exporters.base import BaseExporter


class SentinelConnector(BaseExporter):
    """Export Briefcase decision records to a Microsoft Sentinel workspace.

    Uses the Azure Monitor HTTP Data Collector API
    (``POST /api/logs?api-version=2016-04-01``).

    Args:
        workspace_id: Log Analytics workspace ID (GUID).
        shared_key: Primary or secondary workspace key (base64-encoded).
        log_type: Custom log table name in Sentinel. Defaults to
                  "BriefcaseDecisions". Must match ``^[A-Za-z][A-Za-z0-9_]*$``.
        batch_size: Events to buffer before auto-flushing. Defaults to 100.
        verify_ssl: Whether to verify SSL certificates. Defaults to True.
        max_retries: Retries on transient 5xx / network errors. Defaults to 3.
        timeout: HTTP request timeout in seconds. Defaults to 10.
    """

    _API_VERSION = "2016-04-01"
    _RESOURCE = "/api/logs"

    def __init__(
        self,
        workspace_id: str,
        shared_key: str,
        log_type: str = "BriefcaseDecisions",
        batch_size: int = 100,
        verify_ssl: bool = True,
        max_retries: int = 3,
        timeout: int = 10,
    ) -> None:
        if not _REQUESTS_AVAILABLE:
            raise ImportError(_INSTALL_HINT)

        self._workspace_id = workspace_id
        self._shared_key = shared_key
        self._log_type = log_type
        self._batch_size = max(1, batch_size)
        self._verify_ssl = verify_ssl
        self._max_retries = max_retries
        self._timeout = timeout

        self._url = (
            f"https://{workspace_id}.ods.opinsights.azure.com"
            f"{self._RESOURCE}?api-version={self._API_VERSION}"
        )
        self._buffer: List[Dict[str, Any]] = []

    # ── BaseExporter interface ────────────────────────────────────────────────

    async def export(self, decision: Any) -> bool:
        """Buffer a decision. Auto-flushes when batch_size is reached."""
        try:
            record = self._decision_to_dict(decision)
            self._buffer.append(record)
            if len(self._buffer) >= self._batch_size:
                return await self.flush()
            return True
        except Exception as e:
            logger.debug("SentinelConnector: export error: %s", e)
            return False

    async def flush(self) -> bool:
        """Send all buffered records to Sentinel. Returns True on success."""
        if not self._buffer:
            return True
        records = list(self._buffer)
        self._buffer.clear()
        return self._send(records)

    async def close(self) -> None:
        """Flush remaining records and release resources."""
        await self.flush()

    # ── Internal helpers ──────────────────────────────────────────────────────

    @staticmethod
    def _decision_to_dict(decision: Any) -> Dict[str, Any]:
        """Convert a decision to a plain dict."""
        if isinstance(decision, dict):
            return decision
        try:
            from dataclasses import asdict, fields
            fields(decision)
            return asdict(decision)
        except TypeError:
            pass
        return getattr(decision, "__dict__", {"repr": repr(decision)})

    def _build_signature(
        self,
        date: str,
        content_length: int,
    ) -> str:
        """Compute HMAC-SHA256 SharedKey Authorization header value.

        The string to sign follows the Azure Monitor specification:
            POST\\n<content-length>\\napplication/json\\nx-ms-date:<date>\\n/api/logs
        """
        string_to_sign = (
            f"POST\n{content_length}\napplication/json\n"
            f"x-ms-date:{date}\n{self._RESOURCE}"
        )
        key_bytes = base64.b64decode(self._shared_key)
        sig = hmac.new(
            key_bytes,
            string_to_sign.encode("utf-8"),
            digestmod=hashlib.sha256,
        ).digest()
        encoded = base64.b64encode(sig).decode("utf-8")
        return f"SharedKey {self._workspace_id}:{encoded}"

    def _send(self, records: List[Dict[str, Any]]) -> bool:
        """POST *records* to the Log Analytics endpoint with retries."""
        body = json.dumps(records, default=str)
        body_bytes = body.encode("utf-8")
        content_length = len(body_bytes)

        # RFC 7231 date in the Azure-required format (RFC 1123)
        date = formatdate(usegmt=True)

        authorization = self._build_signature(date, content_length)

        headers = {
            "Content-Type": "application/json",
            "Authorization": authorization,
            "Log-Type": self._log_type,
            "x-ms-date": date,
            "time-generated-field": "",
        }

        for attempt in range(self._max_retries + 1):
            try:
                resp = _requests.post(
                    self._url,
                    data=body_bytes,
                    headers=headers,
                    verify=self._verify_ssl,
                    timeout=self._timeout,
                )
                if resp.status_code < 500:
                    if resp.status_code >= 400:
                        logger.warning(
                            "SentinelConnector: API returned %s: %s",
                            resp.status_code,
                            resp.text[:200],
                        )
                        return False
                    return True
                logger.debug(
                    "SentinelConnector: transient %s on attempt %d/%d",
                    resp.status_code,
                    attempt + 1,
                    self._max_retries + 1,
                )
            except Exception as exc:
                logger.debug(
                    "SentinelConnector: network error on attempt %d/%d: %s",
                    attempt + 1,
                    self._max_retries + 1,
                    exc,
                )

        logger.warning(
            "SentinelConnector: gave up after %d attempts",
            self._max_retries + 1,
        )
        return False