import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from briefcase.integrations.frameworks._export_mixin import ExportMixin
logger = logging.getLogger(__name__)
_INSTALL_HINT = (
"ag2 is required for AG2HookTracer. "
"Install with: pip install ag2 or pip install briefcase-ai[ag2]"
)
try:
from autogen import ConversableAgent _AG2_AVAILABLE = True
except ImportError:
_AG2_AVAILABLE = False
ConversableAgent = None
def instrument_agent(
agent: Any,
context_version: Optional[str] = None,
async_capture: bool = True,
exporter: Any = None,
) -> "AG2HookTracer":
tracer = AG2HookTracer(
context_version=context_version,
async_capture=async_capture,
exporter=exporter,
)
tracer.instrument(agent)
return tracer
class AG2HookTracer(ExportMixin):
_HOOK_NAMES = [
"process_message_before_send",
"process_all_messages_before_reply",
"update_agent_state",
"safeguard_llm_inputs",
"safeguard_llm_outputs",
"safeguard_tool_inputs",
"safeguard_tool_outputs",
]
def __init__(
self,
context_version: Optional[str] = None,
async_capture: bool = True,
capture_messages: bool = True,
capture_llm: bool = True,
capture_tools: bool = True,
capture_state: bool = True,
max_input_chars: int = 10000,
max_output_chars: int = 10000,
exporter: Any = None,
):
if not _AG2_AVAILABLE:
raise ImportError(_INSTALL_HINT)
self.context_version = context_version
self.async_capture = async_capture
self.capture_messages = capture_messages
self.capture_llm = capture_llm
self.capture_tools = capture_tools
self.capture_state = capture_state
self.max_input_chars = max_input_chars
self.max_output_chars = max_output_chars
self._exporter = exporter
self._records: List[Dict[str, Any]] = []
def get_records(self) -> List[Dict[str, Any]]:
return list(self._records)
def clear(self) -> None:
self._records.clear()
@property
def decision_count(self) -> int:
return len(self._records)
def instrument(self, agent: Any) -> None:
require_ag2()
self._register_hooks(agent)
def instrument_many(self, agents: List[Any]) -> None:
for agent in agents:
self.instrument(agent)
def _register_hooks(self, agent: Any) -> None:
if self.capture_messages:
agent.register_hook(
"process_message_before_send",
self._make_message_send_hook(),
)
agent.register_hook(
"process_all_messages_before_reply",
self._make_message_context_hook(agent),
)
if self.capture_state:
agent.register_hook(
"update_agent_state",
self._make_state_update_hook(agent),
)
if self.capture_llm:
agent.register_hook(
"safeguard_llm_inputs",
self._make_llm_input_hook(agent),
)
agent.register_hook(
"safeguard_llm_outputs",
self._make_llm_output_hook(agent),
)
if self.capture_tools:
agent.register_hook(
"safeguard_tool_inputs",
self._make_tool_input_hook(agent),
)
agent.register_hook(
"safeguard_tool_outputs",
self._make_tool_output_hook(agent),
)
def _make_message_send_hook(self):
tracer = self
def _hook(message, recipient, silent):
try:
record = tracer._build_record(
decision_type="message_send",
inputs={
"content": tracer._safe_extract_message(message),
"recipient": _agent_name(recipient),
"silent": silent,
},
)
tracer._append_and_export(record)
except Exception:
pass
return message
return _hook
def _make_message_context_hook(self, agent: Any):
tracer = self
def _hook(messages):
try:
safe_msgs = []
if isinstance(messages, list):
for m in messages:
safe_msgs.append(tracer._safe_extract_message(m))
record = tracer._build_record(
decision_type="message_context",
inputs={
"agent": _agent_name(agent),
"message_count": len(messages) if isinstance(messages, list) else 0,
"messages": safe_msgs,
},
)
tracer._append_and_export(record)
except Exception:
pass
return messages
return _hook
def _make_state_update_hook(self, agent: Any):
tracer = self
def _hook(agent_state):
try:
record = tracer._build_record(
decision_type="state_update",
inputs={
"agent": _agent_name(agent),
"state": _safe_serialize_small(agent_state, tracer.max_input_chars),
},
)
tracer._append_and_export(record)
except Exception:
pass
return agent_state
return _hook
def _make_llm_input_hook(self, agent: Any):
tracer = self
def _hook(messages):
try:
safe_msgs = []
if isinstance(messages, list):
for m in messages:
safe_msgs.append(tracer._safe_extract_message(m))
record = tracer._build_record(
decision_type="llm_input",
inputs={
"agent": _agent_name(agent),
"messages": safe_msgs,
},
)
tracer._append_and_export(record)
except Exception:
pass
return messages
return _hook
def _make_llm_output_hook(self, agent: Any):
tracer = self
def _hook(response):
try:
record = tracer._build_record(
decision_type="llm_output",
outputs={
"agent": _agent_name(agent),
"response": _safe_serialize_small(response, tracer.max_output_chars),
},
)
tracer._append_and_export(record)
except Exception:
pass
return response
return _hook
def _make_tool_input_hook(self, agent: Any):
tracer = self
def _hook(tool_call):
try:
record = tracer._build_record(
decision_type="tool_input",
inputs={
"agent": _agent_name(agent),
"tool_call": _safe_serialize_small(tool_call, tracer.max_input_chars),
},
)
tracer._append_and_export(record)
except Exception:
pass
return tool_call
return _hook
def _make_tool_output_hook(self, agent: Any):
tracer = self
def _hook(tool_result):
try:
record = tracer._build_record(
decision_type="tool_output",
outputs={
"agent": _agent_name(agent),
"result": _safe_serialize_small(tool_result, tracer.max_output_chars),
},
)
tracer._append_and_export(record)
except Exception:
pass
return tool_result
return _hook
def _build_record(
self,
decision_type: str,
inputs: Optional[Dict[str, Any]] = None,
outputs: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
record: Dict[str, Any] = {
"decision_id": str(uuid.uuid4()),
"decision_type": decision_type,
"captured_at": datetime.now(timezone.utc).isoformat(),
"inputs": inputs or {},
"outputs": outputs or {},
}
if self.context_version is not None:
record["context_version"] = self.context_version
return record
def _append_and_export(self, record: Dict[str, Any]) -> None:
self._records.append(record)
self._trigger_export(record)
def _safe_extract_message(self, message: Any) -> Any:
return _safe_extract_message(message, self.max_input_chars)
def require_ag2() -> None:
if not _AG2_AVAILABLE:
raise ImportError(_INSTALL_HINT)
def _agent_name(agent: Any) -> Optional[str]:
try:
return getattr(agent, "name", None) or str(agent)
except Exception:
return None
def _safe_extract_message(message: Any, max_chars: int = 10000) -> Any:
try:
if message is None:
return None
if isinstance(message, str):
return message[:max_chars]
if isinstance(message, dict):
content = message.get("content", "")
role = message.get("role", "unknown")
return {
"role": role,
"content": str(content)[:max_chars],
}
return str(message)[:max_chars]
except Exception:
return "<unserializable>"
def _safe_serialize_small(obj: Any, max_chars: int = 10000) -> Any:
try:
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return str(obj)[:max_chars] if isinstance(obj, str) else obj
if isinstance(obj, dict):
return {str(k): str(v)[:max_chars] for k, v in list(obj.items())[:50]}
if isinstance(obj, (list, tuple)):
return [str(item)[:max_chars] for item in list(obj)[:50]]
return str(obj)[:max_chars]
except Exception:
return "<unserializable>"