from __future__ import annotations
import asyncio
import hashlib
import json
import time
from typing import Any, Dict, Optional, Tuple
from briefcase.routing.base import BaseRouter, RoutingDecision
from briefcase.routing.internal import InternalRouter
try:
import httpx as httpx
except ImportError:
httpx = None
class _LRUCache:
def __init__(self, ttl_seconds: float) -> None:
self._ttl = ttl_seconds
self._store: Dict[str, Tuple[RoutingDecision, float]] = {}
def get(self, key: str) -> Optional[RoutingDecision]:
entry = self._store.get(key)
if entry is None:
return None
decision, inserted_at = entry
if time.monotonic() - inserted_at > self._ttl:
del self._store[key]
return None
return decision
def set(self, key: str, value: RoutingDecision) -> None:
self._store[key] = (value, time.monotonic())
class OPARouter(BaseRouter):
def __init__(
self,
endpoint: str,
timeout_ms: float = 50.0,
cache_ttl_seconds: float = 60.0,
fallback_threshold: float = 0.85,
) -> None:
if httpx is None:
raise ImportError(
"httpx is required for OPARouter. "
"Install it with: pip install 'briefcase-ai[splunk]'"
)
self._endpoint = endpoint
self._timeout_s = timeout_ms / 1000.0
self._cache = _LRUCache(cache_ttl_seconds)
self._fallback = InternalRouter(confidence_threshold=fallback_threshold)
@staticmethod
def _build_input(decision_context: Any) -> dict:
confidence = getattr(decision_context, "confidence", None)
ctx = getattr(decision_context, "context", None)
context_version = (
str(getattr(ctx, "version", "") or "")
if ctx is not None
else str(getattr(decision_context, "context_version", "") or "")
)
model_params = getattr(decision_context, "model_parameters", None)
if isinstance(model_params, dict):
model_name = model_params.get("model_name", "")
else:
model_name = str(getattr(model_params, "model_name", "") or "") if model_params else ""
tags = getattr(decision_context, "tags", {}) or {}
return {
"confidence": float(confidence) if confidence is not None else None,
"context_version": context_version,
"model_name": model_name,
"tags": tags,
}
@staticmethod
def _cache_key(input_doc: dict) -> str:
serialised = json.dumps(input_doc, sort_keys=True, default=str)
return hashlib.sha256(serialised.encode()).hexdigest()
async def route(self, decision_context: Any) -> RoutingDecision:
start = time.monotonic()
input_doc = self._build_input(decision_context)
cache_key = self._cache_key(input_doc)
cached = self._cache.get(cache_key)
if cached is not None:
return cached
action = "human_review"
reason = None
source = "opa"
try:
async with httpx.AsyncClient(timeout=self._timeout_s) as client:
response = await client.post(
self._endpoint,
json={"input": input_doc},
)
response.raise_for_status()
data = response.json()
result = data.get("result", {})
action = result.get("action", "human_review")
reason = result.get("reason")
except Exception:
fallback_result = await self._fallback.route(decision_context)
eval_time_ms = (time.monotonic() - start) * 1000.0
decision = RoutingDecision(
action=fallback_result.action,
source="internal",
eval_time_ms=eval_time_ms,
reason="OPA unavailable; fallback to internal router",
)
await self._maybe_emit(decision, decision_context)
return decision
eval_time_ms = (time.monotonic() - start) * 1000.0
decision = RoutingDecision(
action=action,
source=source,
eval_time_ms=eval_time_ms,
reason=reason,
)
self._cache.set(cache_key, decision)
await self._maybe_emit(decision, decision_context)
return decision
@staticmethod
async def _maybe_emit(decision: RoutingDecision, decision_context: Any) -> None:
if decision.action == "human_review":
try:
from briefcase.events.emitter import emit_low_confidence
confidence = getattr(decision_context, "confidence", 0.0) or 0.0
threshold = 0.85 await emit_low_confidence(decision_context, float(confidence), threshold)
except Exception:
pass