from __future__ import annotations
import base64
import hashlib
import hmac
import json
import logging
from datetime import datetime, timezone
from email.utils import formatdate
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"requests is required for SentinelConnector. "
"Install with: pip install requests or pip install briefcase-ai[sentinel]"
)
try:
import requests as _requests
_REQUESTS_AVAILABLE = True
except ImportError:
_requests = None _REQUESTS_AVAILABLE = False
from briefcase.exporters.base import BaseExporter
class SentinelConnector(BaseExporter):
_API_VERSION = "2016-04-01"
_RESOURCE = "/api/logs"
def __init__(
self,
workspace_id: str,
shared_key: str,
log_type: str = "BriefcaseDecisions",
batch_size: int = 100,
verify_ssl: bool = True,
max_retries: int = 3,
timeout: int = 10,
) -> None:
if not _REQUESTS_AVAILABLE:
raise ImportError(_INSTALL_HINT)
self._workspace_id = workspace_id
self._shared_key = shared_key
self._log_type = log_type
self._batch_size = max(1, batch_size)
self._verify_ssl = verify_ssl
self._max_retries = max_retries
self._timeout = timeout
self._url = (
f"https://{workspace_id}.ods.opinsights.azure.com"
f"{self._RESOURCE}?api-version={self._API_VERSION}"
)
self._buffer: List[Dict[str, Any]] = []
async def export(self, decision: Any) -> bool:
try:
record = self._decision_to_dict(decision)
self._buffer.append(record)
if len(self._buffer) >= self._batch_size:
return await self.flush()
return True
except Exception as e:
logger.debug("SentinelConnector: export error: %s", e)
return False
async def flush(self) -> bool:
if not self._buffer:
return True
records = list(self._buffer)
self._buffer.clear()
return self._send(records)
async def close(self) -> None:
await self.flush()
@staticmethod
def _decision_to_dict(decision: Any) -> Dict[str, Any]:
if isinstance(decision, dict):
return decision
try:
from dataclasses import asdict, fields
fields(decision)
return asdict(decision)
except TypeError:
pass
return getattr(decision, "__dict__", {"repr": repr(decision)})
def _build_signature(
self,
date: str,
content_length: int,
) -> str:
string_to_sign = (
f"POST\n{content_length}\napplication/json\n"
f"x-ms-date:{date}\n{self._RESOURCE}"
)
key_bytes = base64.b64decode(self._shared_key)
sig = hmac.new(
key_bytes,
string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256,
).digest()
encoded = base64.b64encode(sig).decode("utf-8")
return f"SharedKey {self._workspace_id}:{encoded}"
def _send(self, records: List[Dict[str, Any]]) -> bool:
body = json.dumps(records, default=str)
body_bytes = body.encode("utf-8")
content_length = len(body_bytes)
date = formatdate(usegmt=True)
authorization = self._build_signature(date, content_length)
headers = {
"Content-Type": "application/json",
"Authorization": authorization,
"Log-Type": self._log_type,
"x-ms-date": date,
"time-generated-field": "",
}
for attempt in range(self._max_retries + 1):
try:
resp = _requests.post(
self._url,
data=body_bytes,
headers=headers,
verify=self._verify_ssl,
timeout=self._timeout,
)
if resp.status_code < 500:
if resp.status_code >= 400:
logger.warning(
"SentinelConnector: API returned %s: %s",
resp.status_code,
resp.text[:200],
)
return False
return True
logger.debug(
"SentinelConnector: transient %s on attempt %d/%d",
resp.status_code,
attempt + 1,
self._max_retries + 1,
)
except Exception as exc:
logger.debug(
"SentinelConnector: network error on attempt %d/%d: %s",
attempt + 1,
self._max_retries + 1,
exc,
)
logger.warning(
"SentinelConnector: gave up after %d attempts",
self._max_retries + 1,
)
return False