from __future__ import annotations
import time
from typing import Any, Optional
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.trace import SpanKind
from briefcase.exporters.base import BaseExporter
class OTelExporter(BaseExporter):
def __init__(
self,
endpoint: str = "http://localhost:4317",
service_name: str = "briefcase-ai",
protocol: str = "grpc",
span_exporter: Optional[SpanExporter] = None,
) -> None:
self._endpoint = endpoint
self._service_name = service_name
self._protocol = protocol
resource = Resource.create({SERVICE_NAME: service_name})
if span_exporter is not None:
self._span_exporter: SpanExporter = span_exporter
else:
self._span_exporter = self._build_exporter(endpoint, protocol)
from opentelemetry.sdk.trace.export import BatchSpanProcessor
self._provider = TracerProvider(resource=resource)
self._provider.add_span_processor(BatchSpanProcessor(self._span_exporter))
self._tracer = self._provider.get_tracer("briefcase")
@staticmethod
def _build_exporter(endpoint: str, protocol: str) -> SpanExporter:
if protocol == "grpc":
try:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcExporter,
)
except ImportError as exc:
raise ImportError(
"opentelemetry-exporter-otlp-proto-grpc is required for "
"protocol='grpc'. Install it with: "
"pip install 'briefcase-ai[otel-export]'"
) from exc
return GrpcExporter(endpoint=endpoint)
elif protocol == "http":
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HttpExporter,
)
except ImportError as exc:
raise ImportError(
"opentelemetry-exporter-otlp-proto-http is required for "
"protocol='http'. Install it with: "
"pip install 'briefcase-ai[otel-export]'"
) from exc
return HttpExporter(endpoint=endpoint)
else:
raise ValueError(f"Unsupported protocol: {protocol!r}. Use 'grpc' or 'http'.")
@staticmethod
def _extract_attrs(decision: Any) -> dict:
attrs: dict = {}
decision_id = (
getattr(decision, "decision_id", None)
or getattr(decision, "id", None)
or ""
)
attrs["briefcase.decision.id"] = str(decision_id)
context_version = ""
ctx = getattr(decision, "context", None)
if ctx is not None:
context_version = str(getattr(ctx, "version", "") or "")
if not context_version:
context_version = str(getattr(decision, "context_version", "") or "")
attrs["briefcase.context.version"] = context_version
confidence: float = 0.0
outputs = getattr(decision, "outputs", None)
if outputs:
if isinstance(outputs, list):
for out in outputs:
c = getattr(out, "confidence", None)
if c is not None:
confidence = float(c)
break
elif isinstance(outputs, dict):
confidence = float(outputs.get("confidence", 0.0))
conf_direct = getattr(decision, "confidence", None)
if conf_direct is not None:
confidence = float(conf_direct)
attrs["briefcase.confidence"] = confidence
routing = getattr(decision, "routing_decision", None)
if routing is not None:
attrs["briefcase.routing.action"] = str(getattr(routing, "action", "unrouted"))
else:
attrs["briefcase.routing.action"] = "unrouted"
model_name = ""
model_provider = ""
model_params = getattr(decision, "model_parameters", None)
if model_params is not None:
if isinstance(model_params, dict):
model_name = str(model_params.get("model_name", "") or "")
model_provider = str(model_params.get("provider", "") or "")
else:
model_name = str(getattr(model_params, "model_name", "") or "")
model_provider = str(getattr(model_params, "provider", "") or "")
attrs["briefcase.model.name"] = model_name
attrs["briefcase.model.provider"] = model_provider
exec_ms = getattr(decision, "execution_time_ms", None)
attrs["briefcase.execution.time_ms"] = float(exec_ms) if exec_ms is not None else 0.0
snapshot_type = getattr(decision, "snapshot_type", None)
if snapshot_type is None:
cls_name = type(decision).__name__.lower()
if "batch" in cls_name:
snapshot_type = "batch"
elif "session" in cls_name:
snapshot_type = "session"
else:
snapshot_type = "decision"
attrs["briefcase.snapshot.type"] = str(snapshot_type)
return attrs
async def export(self, decision: Any) -> bool:
try:
attrs = self._extract_attrs(decision)
current_span = trace.get_current_span()
ctx = trace.set_span_in_context(current_span)
with self._tracer.start_as_current_span(
"briefcase.decision",
context=ctx,
kind=SpanKind.INTERNAL,
attributes=attrs,
):
pass
return True
except Exception:
return False
async def flush(self) -> None:
self._provider.force_flush(timeout_millis=5000)
async def close(self) -> None:
self._provider.shutdown()
@property
def protocol(self) -> str:
return self._protocol
@property
def span_exporter(self) -> SpanExporter:
return self._span_exporter