import hashlib
import json
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Sequence, Union
from briefcase.integrations.frameworks._export_mixin import ExportMixin
try:
from opentelemetry import trace
HAS_OTEL = True
except ImportError:
HAS_OTEL = False
logger = logging.getLogger(__name__)
@dataclass
class CapturedDecision:
decision_id: str
decision_type: str function_name: str
inputs: Dict[str, Any] = field(default_factory=dict)
outputs: Dict[str, Any] = field(default_factory=dict)
model_parameters: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
started_at: Optional[datetime] = None
ended_at: Optional[datetime] = None
execution_time_ms: Optional[float] = None
parent_run_id: Optional[str] = None
engagement_id: str = ""
workstream_id: str = ""
tags: Dict[str, str] = field(default_factory=dict)
token_usage: Optional[Dict[str, int]] = None
context_version: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
result = {
"decision_id": self.decision_id,
"decision_type": self.decision_type,
"function_name": self.function_name,
"inputs": self.inputs,
"outputs": self.outputs,
"model_parameters": self.model_parameters,
"engagement_id": self.engagement_id,
"workstream_id": self.workstream_id,
"tags": self.tags,
}
if self.error:
result["error"] = self.error
if self.started_at:
result["started_at"] = self.started_at.isoformat()
if self.ended_at:
result["ended_at"] = self.ended_at.isoformat()
if self.execution_time_ms is not None:
result["execution_time_ms"] = self.execution_time_ms
if self.parent_run_id:
result["parent_run_id"] = self.parent_run_id
if self.token_usage:
result["token_usage"] = self.token_usage
if self.context_version is not None:
result["context_version"] = self.context_version
return result
class BriefcaseLangChainHandler(ExportMixin):
def __init__(
self,
engagement_id: str = "",
workstream_id: str = "",
capture_llm: bool = True,
capture_chains: bool = True,
capture_tools: bool = True,
capture_retrievers: bool = True,
max_input_chars: int = 10000,
max_output_chars: int = 10000,
context_version: Optional[str] = None,
async_capture: bool = True,
client: Any = None,
exporter: Any = None,
):
self.engagement_id = engagement_id
self.workstream_id = workstream_id
self.capture_llm = capture_llm
self.capture_chains = capture_chains
self.capture_tools = capture_tools
self.capture_retrievers = capture_retrievers
self.max_input_chars = max_input_chars
self.max_output_chars = max_output_chars
self.context_version = context_version
self.async_capture = async_capture
self.client = client
self._exporter = exporter
self._decisions: List[CapturedDecision] = []
self._inflight: Dict[str, CapturedDecision] = {}
self.raise_error = False
self.run_inline = True
self.ignore_llm = not capture_llm
self.ignore_chain = not capture_chains
self.ignore_agent = False
self.ignore_retriever = not capture_retrievers
def get_decisions(self) -> List[CapturedDecision]:
return list(self._decisions)
def get_decisions_as_dicts(self) -> List[Dict[str, Any]]:
return [d.to_dict() for d in self._decisions]
def clear(self) -> None:
self._decisions.clear()
self._inflight.clear()
@property
def decision_count(self) -> int:
return len(self._decisions)
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: Any = None,
parent_run_id: Any = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if not self.capture_llm:
return
try:
run_key = str(run_id) if run_id else str(uuid.uuid4())
model_name = _extract_model_name(serialized, kwargs)
model_params = _extract_model_params(serialized, kwargs)
truncated_prompts = [
p[:self.max_input_chars] for p in prompts
]
decision = CapturedDecision(
decision_id=run_key,
decision_type="llm",
function_name=model_name,
inputs={"prompts": truncated_prompts},
model_parameters=model_params,
started_at=datetime.now(timezone.utc),
parent_run_id=str(parent_run_id) if parent_run_id else None,
engagement_id=self.engagement_id,
workstream_id=self.workstream_id,
tags=_merge_tags(tags, metadata),
context_version=self.context_version,
)
self._inflight[run_key] = decision
if HAS_OTEL:
_emit_otel_event("llm_start", {
"model": model_name,
"prompt_count": len(prompts),
})
except Exception:
pass
def on_llm_end(
self,
response: Any,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
output_text, token_usage = _extract_llm_output(response)
decision.outputs = {
"text": output_text[:self.max_output_chars] if output_text else None,
}
decision.token_usage = token_usage
self._decisions.append(decision)
def on_llm_error(
self,
error: BaseException,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.error = str(error)
self._decisions.append(decision)
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[Any],
*,
run_id: Any = None,
parent_run_id: Any = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if not self.capture_llm:
return
run_key = str(run_id) if run_id else str(uuid.uuid4())
model_name = _extract_model_name(serialized, kwargs)
model_params = _extract_model_params(serialized, kwargs)
serialized_messages = _serialize_messages(messages, self.max_input_chars)
decision = CapturedDecision(
decision_id=run_key,
decision_type="llm",
function_name=model_name,
inputs={"messages": serialized_messages},
model_parameters=model_params,
started_at=datetime.now(timezone.utc),
parent_run_id=str(parent_run_id) if parent_run_id else None,
engagement_id=self.engagement_id,
workstream_id=self.workstream_id,
tags=_merge_tags(tags, metadata),
context_version=self.context_version,
)
self._inflight[run_key] = decision
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: Any = None,
parent_run_id: Any = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if not self.capture_chains:
return
try:
run_key = str(run_id) if run_id else str(uuid.uuid4())
chain_name = _extract_chain_name(serialized)
decision = CapturedDecision(
decision_id=run_key,
decision_type="chain",
function_name=chain_name,
inputs=_truncate_dict(inputs, self.max_input_chars),
started_at=datetime.now(timezone.utc),
parent_run_id=str(parent_run_id) if parent_run_id else None,
engagement_id=self.engagement_id,
workstream_id=self.workstream_id,
tags=_merge_tags(tags, metadata),
context_version=self.context_version,
)
self._inflight[run_key] = decision
except Exception:
pass
def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.outputs = _truncate_dict(outputs, self.max_output_chars)
self._decisions.append(decision)
if parent_run_id is None:
self._trigger_export(self._assemble_decision_record(decision))
def on_chain_error(
self,
error: BaseException,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.error = str(error)
self._decisions.append(decision)
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
*,
run_id: Any = None,
parent_run_id: Any = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if not self.capture_tools:
return
try:
run_key = str(run_id) if run_id else str(uuid.uuid4())
tool_name = serialized.get("name", serialized.get("id", ["unknown"])[-1]
if isinstance(serialized.get("id"), list)
else "unknown_tool")
decision = CapturedDecision(
decision_id=run_key,
decision_type="tool",
function_name=tool_name,
inputs={"input": input_str[:self.max_input_chars]},
started_at=datetime.now(timezone.utc),
parent_run_id=str(parent_run_id) if parent_run_id else None,
engagement_id=self.engagement_id,
workstream_id=self.workstream_id,
tags=_merge_tags(tags, metadata),
context_version=self.context_version,
)
self._inflight[run_key] = decision
except Exception:
pass
def on_tool_end(
self,
output: str,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.outputs = {"output": str(output)[:self.max_output_chars]}
self._decisions.append(decision)
def on_tool_error(
self,
error: BaseException,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.error = str(error)
self._decisions.append(decision)
def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: Any = None,
parent_run_id: Any = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if not self.capture_retrievers:
return
run_key = str(run_id) if run_id else str(uuid.uuid4())
retriever_name = serialized.get("name", "retriever")
decision = CapturedDecision(
decision_id=run_key,
decision_type="retriever",
function_name=retriever_name,
inputs={"query": query[:self.max_input_chars]},
started_at=datetime.now(timezone.utc),
parent_run_id=str(parent_run_id) if parent_run_id else None,
engagement_id=self.engagement_id,
workstream_id=self.workstream_id,
tags=_merge_tags(tags, metadata),
)
self._inflight[run_key] = decision
def on_retriever_end(
self,
documents: Any,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
doc_summaries = _serialize_documents(documents, self.max_output_chars)
decision.outputs = {
"document_count": len(doc_summaries),
"documents": doc_summaries,
}
self._decisions.append(decision)
def on_retriever_error(
self,
error: BaseException,
*,
run_id: Any = None,
parent_run_id: Any = None,
**kwargs: Any,
) -> None:
run_key = str(run_id) if run_id else None
decision = self._inflight.pop(run_key, None) if run_key else None
if decision is None:
return
decision.ended_at = datetime.now(timezone.utc)
if decision.started_at:
delta = (decision.ended_at - decision.started_at).total_seconds()
decision.execution_time_ms = delta * 1000
decision.error = str(error)
self._decisions.append(decision)
def _assemble_decision_record(self, decision: CapturedDecision) -> Dict[str, Any]:
record = decision.to_dict()
record["child_spans"] = [
d.to_dict() for d in self._decisions
if d.parent_run_id == decision.decision_id
]
return record
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
pass
def on_text(self, text: str, **kwargs: Any) -> None:
pass
def on_agent_action(self, action: Any, **kwargs: Any) -> None:
pass
def on_agent_finish(self, finish: Any, **kwargs: Any) -> None:
pass
def _extract_model_name(
serialized: Dict[str, Any], kwargs: Dict[str, Any]
) -> str:
for key in ("model_name", "model", "model_id"):
if key in kwargs:
return str(kwargs[key])
ser_kwargs = serialized.get("kwargs", {})
for key in ("model_name", "model", "model_id"):
if key in ser_kwargs:
return str(ser_kwargs[key])
id_path = serialized.get("id", [])
if isinstance(id_path, list) and id_path:
return id_path[-1]
return serialized.get("name", "unknown_model")
def _extract_model_params(
serialized: Dict[str, Any], kwargs: Dict[str, Any]
) -> Dict[str, Any]:
params = {}
ser_kwargs = serialized.get("kwargs", {})
for key in ("temperature", "max_tokens", "top_p", "frequency_penalty",
"presence_penalty", "stop", "model_name", "model"):
if key in ser_kwargs:
params[key] = ser_kwargs[key]
elif key in kwargs:
params[key] = kwargs[key]
return params
def _extract_chain_name(serialized: Dict[str, Any]) -> str:
id_path = serialized.get("id", [])
if isinstance(id_path, list) and id_path:
return id_path[-1]
return serialized.get("name", "unknown_chain")
def _extract_llm_output(response: Any) -> tuple:
output_text = ""
token_usage = None
if response is None:
return output_text, token_usage
if hasattr(response, "generations"):
gens = response.generations
if gens and len(gens) > 0 and len(gens[0]) > 0:
first_gen = gens[0][0]
if hasattr(first_gen, "text"):
output_text = first_gen.text
elif hasattr(first_gen, "message") and hasattr(first_gen.message, "content"):
output_text = first_gen.message.content
if hasattr(response, "llm_output") and response.llm_output:
usage = response.llm_output.get("token_usage", {})
if usage:
token_usage = {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
return output_text, token_usage
def _serialize_messages(
messages: List[Any], max_chars: int
) -> List[Dict[str, str]]:
result = []
for msg_list in messages:
if not isinstance(msg_list, (list, tuple)):
msg_list = [msg_list]
for msg in msg_list:
if hasattr(msg, "type") and hasattr(msg, "content"):
result.append({
"role": getattr(msg, "type", "unknown"),
"content": str(getattr(msg, "content", ""))[:max_chars],
})
elif isinstance(msg, dict):
result.append({
"role": msg.get("role", "unknown"),
"content": str(msg.get("content", ""))[:max_chars],
})
else:
result.append({"role": "unknown", "content": str(msg)[:max_chars]})
return result
def _serialize_documents(
documents: Any, max_chars: int
) -> List[Dict[str, Any]]:
if documents is None:
return []
result = []
for doc in documents:
if hasattr(doc, "page_content") and hasattr(doc, "metadata"):
result.append({
"content_preview": doc.page_content[:200],
"metadata": doc.metadata if isinstance(doc.metadata, dict) else {},
})
elif isinstance(doc, dict):
content = doc.get("page_content", doc.get("content", ""))
result.append({
"content_preview": str(content)[:200],
"metadata": doc.get("metadata", {}),
})
else:
result.append({"content_preview": str(doc)[:200]})
return result
def _truncate_dict(d: Any, max_chars: int) -> Dict[str, Any]:
if not isinstance(d, dict):
return {"value": str(d)[:max_chars]}
result = {}
for k, v in d.items():
if isinstance(v, str):
result[k] = v[:max_chars]
elif isinstance(v, dict):
result[k] = _truncate_dict(v, max_chars)
else:
result[k] = v
return result
def _merge_tags(
tags: Optional[List[str]], metadata: Optional[Dict[str, Any]]
) -> Dict[str, str]:
result = {}
if tags:
for i, tag in enumerate(tags):
result[f"tag_{i}"] = tag
if metadata:
for k, v in metadata.items():
result[str(k)] = str(v)
return result
def _emit_otel_event(name: str, attributes: Dict[str, Any]) -> None:
if not HAS_OTEL:
return
try:
span = trace.get_current_span()
if span and span.is_recording():
span.add_event(f"briefcase.langchain.{name}", attributes=attributes)
except Exception:
pass
def require_langchain() -> None:
try:
import langchain_core except ImportError as exc:
raise ImportError(
"langchain-core is required for BriefcaseLangChainHandler. "
"Install it with: pip install langchain-core"
) from exc