import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from briefcase.integrations.frameworks._export_mixin import ExportMixin
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"autogen-agentchat is required for AutoGenEventHandler. "
"Install with: pip install autogen-agentchat or pip install briefcase-ai[autogen]"
)
try:
from autogen_agentchat import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME
_AUTOGEN_AVAILABLE = True
except ImportError:
_AUTOGEN_AVAILABLE = False
EVENT_LOGGER_NAME = "autogen_agentchat.event" TRACE_LOGGER_NAME = "autogen_agentchat.trace"
_INSTALLED_HANDLER: Optional["AutoGenEventHandler"] = None
def install(
context_version: Optional[str] = None,
async_capture: bool = True,
exporter: Any = None,
) -> "AutoGenEventHandler":
global _INSTALLED_HANDLER
if not _AUTOGEN_AVAILABLE:
raise ImportError(_INSTALL_HINT)
if _INSTALLED_HANDLER is not None:
return _INSTALLED_HANDLER
handler = AutoGenEventHandler(
context_version=context_version,
async_capture=async_capture,
exporter=exporter,
)
handler.attach()
_INSTALLED_HANDLER = handler
return handler
class AutoGenEventHandler(ExportMixin, logging.Handler):
def __init__(
self,
context_version: Optional[str] = None,
async_capture: bool = True,
capture_events: bool = True,
capture_traces: bool = True,
max_input_chars: int = 10000,
max_output_chars: int = 10000,
exporter: Any = None,
):
if not _AUTOGEN_AVAILABLE:
raise ImportError(_INSTALL_HINT)
super().__init__()
self.context_version = context_version
self.async_capture = async_capture
self.capture_events = capture_events
self.capture_traces = capture_traces
self.max_input_chars = max_input_chars
self.max_output_chars = max_output_chars
self._exporter = exporter
self._records: List[Dict[str, Any]] = []
self._attached_loggers: List[str] = []
def emit(self, record: logging.LogRecord) -> None:
try:
logger_name = record.name
is_event = logger_name == EVENT_LOGGER_NAME
is_trace = logger_name == TRACE_LOGGER_NAME
if is_event and not self.capture_events:
return
if is_trace and not self.capture_traces:
return
event_data = _extract_event_data(record.msg, self.max_input_chars)
briefcase_record = self._build_record(event_data, logger_name)
self._records.append(briefcase_record)
self._trigger_export(briefcase_record)
except Exception:
pass
def get_records(self) -> List[Dict[str, Any]]:
return list(self._records)
def clear(self) -> None:
self._records.clear()
@property
def decision_count(self) -> int:
return len(self._records)
def attach(self) -> None:
require_autogen()
for name in (EVENT_LOGGER_NAME, TRACE_LOGGER_NAME):
lg = logging.getLogger(name)
lg.addHandler(self)
lg.setLevel(logging.DEBUG)
if name not in self._attached_loggers:
self._attached_loggers.append(name)
def detach(self) -> None:
for name in list(self._attached_loggers):
lg = logging.getLogger(name)
lg.removeHandler(self)
self._attached_loggers.clear()
def _build_record(
self,
event_data: Dict[str, Any],
logger_name: str,
) -> Dict[str, Any]:
record: Dict[str, Any] = {
"decision_id": str(uuid.uuid4()),
"decision_type": event_data.get("decision_type", "autogen_event"),
"captured_at": datetime.now(timezone.utc).isoformat(),
"logger": logger_name,
"inputs": event_data.get("inputs", {}),
"outputs": event_data.get("outputs", {}),
}
if self.context_version is not None:
record["context_version"] = self.context_version
return record
def require_autogen() -> None:
if not _AUTOGEN_AVAILABLE:
raise ImportError(_INSTALL_HINT)
def _extract_event_data(msg: Any, max_chars: int = 10000) -> Dict[str, Any]:
type_name = type(msg).__name__
if _has_attrs(msg, "source", "content") and not _has_attrs(msg, "tool_calls"):
content = _safe_str(getattr(msg, "content", ""), max_chars)
source = _safe_str(getattr(msg, "source", ""), max_chars)
if "Stop" in type_name:
return {
"decision_type": "control_message",
"inputs": {"source": source, "content": content, "subtype": "stop"},
"outputs": {},
}
if "Handoff" in type_name:
target = _safe_str(getattr(msg, "target", ""), max_chars)
return {
"decision_type": "control_message",
"inputs": {"source": source, "target": target, "content": content, "subtype": "handoff"},
"outputs": {},
}
if "Summary" in type_name:
return {
"decision_type": "tool_call_summary",
"inputs": {"source": source, "content": content},
"outputs": {},
}
return {
"decision_type": "text_message",
"inputs": {"source": source, "content": content},
"outputs": {},
}
if _has_attrs(msg, "tool_calls") and not _has_attrs(msg, "results"):
raw_calls = getattr(msg, "tool_calls", [])
calls = _safe_serialize(raw_calls, max_chars)
source = _safe_str(getattr(msg, "source", ""), max_chars)
return {
"decision_type": "tool_call_request",
"inputs": {"source": source, "tool_calls": calls},
"outputs": {},
}
if _has_attrs(msg, "results"):
raw_results = getattr(msg, "results", [])
results = _safe_serialize(raw_results, max_chars)
source = _safe_str(getattr(msg, "source", ""), max_chars)
return {
"decision_type": "tool_call_execution",
"inputs": {"source": source},
"outputs": {"results": results},
}
if isinstance(msg, str):
return {
"decision_type": "autogen_event",
"inputs": {"raw": msg[:max_chars]},
"outputs": {},
}
return {
"decision_type": "autogen_event",
"inputs": {"type": type_name, "raw": _safe_str(msg, max_chars)},
"outputs": {},
}
def _has_attrs(obj: Any, *attrs: str) -> bool:
return all(hasattr(obj, a) for a in attrs)
def _safe_str(obj: Any, max_chars: int = 10000) -> str:
try:
return str(obj)[:max_chars]
except Exception:
return "<unserializable>"
def _safe_serialize(obj: Any, max_chars: int = 10000) -> Any:
try:
if obj is None:
return None
if isinstance(obj, list):
return [_safe_str(item, max_chars) for item in obj[:50]]
return _safe_str(obj, max_chars)
except Exception:
return "<unserializable>"