from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"requests is required for SplunkHECExporter. "
"Install with: pip install requests or pip install briefcase-ai[splunk]"
)
try:
import requests as _requests
_REQUESTS_AVAILABLE = True
except ImportError:
_requests = None _REQUESTS_AVAILABLE = False
from briefcase.exporters.base import BaseExporter
class SplunkHECExporter(BaseExporter):
_HEC_PATH = "/services/collector/event"
def __init__(
self,
url: str,
token: str,
index: str = "main",
sourcetype: str = "briefcase_ai",
batch_size: int = 100,
verify_ssl: bool = True,
max_retries: int = 3,
timeout: int = 10,
) -> None:
if not _REQUESTS_AVAILABLE:
raise ImportError(_INSTALL_HINT)
self._url = url.rstrip("/") + self._HEC_PATH
self._token = token
self._index = index
self._sourcetype = sourcetype
self._batch_size = max(1, batch_size)
self._verify_ssl = verify_ssl
self._max_retries = max_retries
self._timeout = timeout
self._buffer: List[Dict[str, Any]] = []
async def export(self, decision: Any) -> bool:
try:
event = self._build_event(decision)
self._buffer.append(event)
if len(self._buffer) >= self._batch_size:
return await self.flush()
return True
except Exception as e:
logger.debug("SplunkHECExporter: export error: %s", e)
return False
async def flush(self) -> bool:
if not self._buffer:
return True
events = list(self._buffer)
self._buffer.clear()
return self._send(events)
async def close(self) -> None:
await self.flush()
def _build_event(self, decision: Any) -> Dict[str, Any]:
if isinstance(decision, dict):
payload = decision
else:
try:
from dataclasses import asdict, fields
fields(decision)
payload = asdict(decision)
except TypeError:
payload = getattr(decision, "__dict__", {"repr": repr(decision)})
return {
"time": datetime.now(timezone.utc).timestamp(),
"index": self._index,
"sourcetype": self._sourcetype,
"event": payload,
}
def _send(self, events: List[Dict[str, Any]]) -> bool:
body = "\n".join(json.dumps(e, default=str) for e in events)
headers = {
"Authorization": f"Splunk {self._token}",
"Content-Type": "application/json",
}
for attempt in range(self._max_retries + 1):
try:
resp = _requests.post(
self._url,
data=body,
headers=headers,
verify=self._verify_ssl,
timeout=self._timeout,
)
if resp.status_code < 500:
if resp.status_code >= 400:
logger.warning(
"SplunkHECExporter: HEC returned %s: %s",
resp.status_code,
resp.text[:200],
)
return False
return True
logger.debug(
"SplunkHECExporter: transient %s on attempt %d/%d",
resp.status_code,
attempt + 1,
self._max_retries + 1,
)
except Exception as exc:
logger.debug(
"SplunkHECExporter: network error on attempt %d/%d: %s",
attempt + 1,
self._max_retries + 1,
exc,
)
logger.warning(
"SplunkHECExporter: gave up after %d attempts", self._max_retries + 1
)
return False