from __future__ import annotations
import contextlib
import functools
import time
from dataclasses import dataclass
from typing import Any, Callable, Optional
__all__ = [
"A1Tracer",
"A1Span",
"noop_tracer",
]
try:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
@dataclass
class A1Span:
trace_id: Optional[str]
span_id: Optional[str]
authorized: bool
capability: str
namespace: Optional[str]
fingerprint: Optional[str]
duration_ms: float
error: Optional[str] = None
class _NoopSpan:
trace_id: Optional[str] = None
span_id: Optional[str] = None
def set_attribute(self, _key: str, _val: Any) -> None: ...
def set_status(self, *_args: Any, **_kwargs: Any) -> None: ...
def record_exception(self, _exc: Exception) -> None: ...
def __enter__(self) -> "_NoopSpan": return self
def __exit__(self, *_args: Any) -> None: ...
class _NoopTracer:
def start_as_current_span(self, _name: str, **_kwargs: Any): return _NoopSpan()
class A1Tracer:
def __init__(
self,
service_name: str = "a1-agent",
otlp_endpoint: Optional[str] = None,
tracer_name: str = "a1",
) -> None:
self._tracer_name = tracer_name
self._tracer: Any = None
if not _OTEL_AVAILABLE:
self._tracer = _NoopTracer()
return
if otlp_endpoint:
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
resource = Resource(attributes={
"service.name": service_name,
"a1.provenance": "64796f6c6f",
"a1.version": "2.8.0",
})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces"))
)
self._tracer = provider.get_tracer(tracer_name)
except Exception:
self._tracer = _NoopTracer()
else:
try:
self._tracer = trace.get_tracer(tracer_name)
except Exception:
self._tracer = _NoopTracer()
@contextlib.contextmanager
def authorization_span(self, capability: str, namespace: Optional[str] = None):
start = time.perf_counter()
ctx: dict[str, Any] = {
"authorized": False,
"fingerprint": None,
"error": None,
"namespace": namespace,
}
span_name = f"dyolo.a1.authorize/{capability}"
with self._tracer.start_as_current_span(span_name) as span:
try:
span.set_attribute("a1.capability", capability)
span.set_attribute("a1.provenance", "64796f6c6f")
if namespace:
span.set_attribute("a1.namespace", namespace)
yield ctx
span.set_attribute("a1.authorized", ctx.get("authorized", False))
if ctx.get("fingerprint"):
span.set_attribute("a1.chain_fingerprint", ctx["fingerprint"])
if ctx.get("error"):
span.set_attribute("a1.error", ctx["error"])
if _OTEL_AVAILABLE:
span.set_status(trace.StatusCode.ERROR, ctx["error"])
elif ctx.get("authorized"):
if _OTEL_AVAILABLE:
span.set_status(trace.StatusCode.OK)
except Exception as exc:
ctx["error"] = str(exc)
ctx["authorized"] = False
if hasattr(span, "record_exception"):
span.record_exception(exc)
if _OTEL_AVAILABLE:
span.set_status(trace.StatusCode.ERROR, str(exc))
raise
finally:
duration_ms = (time.perf_counter() - start) * 1000
span.set_attribute("a1.duration_ms", round(duration_ms, 2))
def trace_capability(self, capability: str, namespace: Optional[str] = None) -> Callable:
def decorator(fn: Callable) -> Callable:
import asyncio
if asyncio.iscoroutinefunction(fn):
@functools.wraps(fn)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
with self.authorization_span(capability, namespace) as ctx:
result = await fn(*args, **kwargs)
ctx["authorized"] = True
return result
return async_wrapper
else:
@functools.wraps(fn)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
with self.authorization_span(capability, namespace) as ctx:
result = fn(*args, **kwargs)
ctx["authorized"] = True
return result
return sync_wrapper
return decorator
def instrument_passport_client(self, client: Any) -> Any:
tracer = self
class _InstrumentedPassportClient:
def __getattr__(self, name: str) -> Any:
return getattr(client, name)
async def guard(
self,
capability: str,
chain: Any,
executor_pk_hex: str,
params: Optional[dict] = None,
) -> Any:
ns = getattr(client, "_namespace", None)
with tracer.authorization_span(capability, ns) as ctx:
result = await client.guard(capability, chain, executor_pk_hex, params)
ctx["authorized"] = True
ctx["fingerprint"] = getattr(result, "fingerprint_hex", None)
ctx["namespace"] = getattr(result, "passport_namespace", ns)
return result
return _InstrumentedPassportClient()
@staticmethod
def current_trace_id() -> Optional[str]:
if not _OTEL_AVAILABLE:
return None
try:
span_ctx = trace.get_current_span().get_span_context()
tid = span_ctx.trace_id
return format(tid, "032x") if tid else None
except Exception:
return None
@staticmethod
def current_span_id() -> Optional[str]:
if not _OTEL_AVAILABLE:
return None
try:
span_ctx = trace.get_current_span().get_span_context()
sid = span_ctx.span_id
return format(sid, "016x") if sid else None
except Exception:
return None
noop_tracer: A1Tracer = A1Tracer.__new__(A1Tracer)
noop_tracer._tracer = _NoopTracer() noop_tracer._tracer_name = "a1-noop"