import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from briefcase.integrations.frameworks._export_mixin import ExportMixin
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"openai-agents is required for OpenAIAgentsTracer. "
"Install with: pip install openai-agents or pip install briefcase-ai[agents]"
)
try:
from agents import (
TracingProcessor,
Trace,
Span,
add_trace_processor as _agents_add_trace_processor,
AgentSpanData,
FunctionSpanData,
HandoffSpanData,
GuardrailSpanData,
GenerationSpanData,
)
_AGENTS_AVAILABLE = True
_TracingBase = TracingProcessor
except ImportError:
_AGENTS_AVAILABLE = False
_TracingBase = object
_INSTALLED_PROCESSOR: Optional["OpenAIAgentsTracer"] = None
def install(
context_version: Optional[str] = None,
async_capture: bool = True,
exporter: Any = None,
) -> "OpenAIAgentsTracer":
global _INSTALLED_PROCESSOR
if not _AGENTS_AVAILABLE:
raise ImportError(_INSTALL_HINT)
if _INSTALLED_PROCESSOR is not None:
return _INSTALLED_PROCESSOR
processor = OpenAIAgentsTracer(
context_version=context_version,
async_capture=async_capture,
exporter=exporter,
)
_agents_add_trace_processor(processor)
_INSTALLED_PROCESSOR = processor
return processor
class OpenAIAgentsTracer(ExportMixin, _TracingBase):
def __init__(
self,
context_version: Optional[str] = None,
async_capture: bool = True,
exporter: Any = None,
):
if not _AGENTS_AVAILABLE:
raise ImportError(_INSTALL_HINT)
self.context_version = context_version
self.async_capture = async_capture
self._exporter = exporter
self._traces: Dict[str, Dict[str, Any]] = {}
self._span_starts: Dict[str, datetime] = {}
self._records: List[Dict[str, Any]] = []
def on_trace_start(self, trace: Any) -> None:
try:
trace_id = trace.trace_id
record: Dict[str, Any] = {
"trace_id": trace_id,
"name": trace.name,
"spans": [],
"started_at": datetime.now(timezone.utc).isoformat(),
}
if self.context_version:
record["context_version"] = self.context_version
self._traces[trace_id] = record
except Exception as e:
logger.debug("OpenAIAgentsTracer: on_trace_start error: %s", e)
def on_trace_end(self, trace: Any) -> None:
try:
trace_id = trace.trace_id
record = self._traces.pop(trace_id, None)
if record is None:
return
record["ended_at"] = datetime.now(timezone.utc).isoformat()
self._records.append(record)
self._trigger_export(record)
except Exception as e:
logger.debug("OpenAIAgentsTracer: on_trace_end error: %s", e)
def on_span_start(self, span: Any) -> None:
try:
self._span_starts[span.span_id] = datetime.now(timezone.utc)
except Exception as e:
logger.debug("OpenAIAgentsTracer: on_span_start error: %s", e)
def on_span_end(self, span: Any) -> None:
try:
span_id = span.span_id
trace_id = span.trace_id
started_at = self._span_starts.pop(span_id, None)
ended_at = datetime.now(timezone.utc)
span_record = self._build_span_record(span, started_at, ended_at)
trace_record = self._traces.get(trace_id)
if trace_record is not None:
trace_record["spans"].append(span_record)
except Exception as e:
logger.debug("OpenAIAgentsTracer: on_span_end error: %s", e)
def shutdown(self) -> None:
try:
self._traces.clear()
self._span_starts.clear()
except Exception as e:
logger.debug("OpenAIAgentsTracer: shutdown error: %s", e)
def force_flush(self) -> None:
pass
def get_records(self) -> List[Dict[str, Any]]:
return list(self._records)
def clear(self) -> None:
self._records.clear()
self._traces.clear()
self._span_starts.clear()
def _build_span_record(
self,
span: Any,
started_at: Optional[datetime],
ended_at: datetime,
) -> Dict[str, Any]:
record: Dict[str, Any] = {
"span_id": span.span_id,
"trace_id": span.trace_id,
"started_at": started_at.isoformat() if started_at else None,
"ended_at": ended_at.isoformat(),
}
if started_at:
delta = (ended_at - started_at).total_seconds()
record["execution_time_ms"] = delta * 1000
span_data = span.span_data
if _AGENTS_AVAILABLE:
if isinstance(span_data, AgentSpanData):
record["type"] = "agent_run"
record["agent_name"] = getattr(span_data, "name", None)
record["tools"] = list(getattr(span_data, "tools", None) or [])
record["handoffs"] = list(getattr(span_data, "handoffs", None) or [])
record["output_type"] = getattr(span_data, "output_type", None)
elif isinstance(span_data, FunctionSpanData):
record["type"] = "tool_call"
record["tool_name"] = getattr(span_data, "name", None)
record["input"] = getattr(span_data, "input", None)
record["output"] = getattr(span_data, "output", None)
elif isinstance(span_data, HandoffSpanData):
record["type"] = "handoff"
record["from_agent"] = getattr(span_data, "from_agent", None)
record["to_agent"] = getattr(span_data, "to_agent", None)
elif isinstance(span_data, GuardrailSpanData):
record["type"] = "guardrail"
record["guardrail_name"] = getattr(span_data, "name", None)
record["triggered"] = getattr(span_data, "triggered", False)
elif isinstance(span_data, GenerationSpanData):
record["type"] = "generation"
record["model"] = getattr(span_data, "model", None)
record["usage"] = getattr(span_data, "usage", None)
record["input"] = getattr(span_data, "input", None)
record["output"] = getattr(span_data, "output", None)
else:
record["type"] = getattr(span_data, "type", "unknown")
error = getattr(span, "error", None)
if error:
record["error"] = str(error)
return record