from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Sequence
from briefcase.cowork.receiver import CoworkEvent
from briefcase.semantic_conventions import cowork as conv
@dataclass
class CostSummary:
total_cost_usd: float = 0.0
request_count: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
models: Dict[str, float] = field(default_factory=lambda: defaultdict(float))
@property
def avg_cost_per_request(self) -> float:
return self.total_cost_usd / self.request_count if self.request_count else 0.0
@dataclass
class ToolUsageSummary:
tool_name: str = ""
invocation_count: int = 0
success_count: int = 0
failure_count: int = 0
total_duration_ms: float = 0.0
total_result_size_bytes: int = 0
errors: List[str] = field(default_factory=list)
@property
def success_rate(self) -> float:
return self.success_count / self.invocation_count if self.invocation_count else 0.0
@property
def avg_duration_ms(self) -> float:
return self.total_duration_ms / self.invocation_count if self.invocation_count else 0.0
@dataclass
class ApiPerformanceSummary:
model: str = ""
request_count: int = 0
error_count: int = 0
total_duration_ms: float = 0.0
status_code_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
total_cost_usd: float = 0.0
speeds: List[float] = field(default_factory=list)
@property
def error_rate(self) -> float:
total = self.request_count + self.error_count
return self.error_count / total if total else 0.0
@property
def avg_duration_ms(self) -> float:
return self.total_duration_ms / self.request_count if self.request_count else 0.0
@dataclass
class CacheEfficiencySummary:
model: str = ""
total_cache_read_tokens: int = 0
total_cache_creation_tokens: int = 0
request_count: int = 0
@property
def cache_hit_ratio(self) -> float:
total = self.total_cache_read_tokens + self.total_cache_creation_tokens
return self.total_cache_read_tokens / total if total else 0.0
class CoworkDashboards:
def __init__(self) -> None:
self._api_requests: List[CoworkEvent] = []
self._api_errors: List[CoworkEvent] = []
self._tool_results: List[CoworkEvent] = []
def ingest(self, event: CoworkEvent) -> None:
if event.event_type == conv.EVENT_API_REQUEST:
self._api_requests.append(event)
elif event.event_type == conv.EVENT_API_ERROR:
self._api_errors.append(event)
elif event.event_type == conv.EVENT_TOOL_RESULT:
self._tool_results.append(event)
def ingest_many(self, events: Sequence[CoworkEvent]) -> None:
for e in events:
self.ingest(e)
def clear(self) -> None:
self._api_requests.clear()
self._api_errors.clear()
self._tool_results.clear()
def cost_by_user(self) -> Dict[str, CostSummary]:
return self._aggregate_cost(lambda e: e.user_id)
def cost_by_organization(self) -> Dict[str, CostSummary]:
return self._aggregate_cost(lambda e: e.organization_id)
def cost_total(self) -> CostSummary:
return self._aggregate_cost(lambda _: "__total__").get("__total__", CostSummary())
def tool_usage(self) -> Dict[str, ToolUsageSummary]:
summaries: Dict[str, ToolUsageSummary] = {}
for event in self._tool_results:
name = str(event.attributes.get(conv.TOOL_NAME, "unknown"))
if name not in summaries:
summaries[name] = ToolUsageSummary(tool_name=name)
s = summaries[name]
s.invocation_count += 1
success = str(event.attributes.get(conv.TOOL_SUCCESS, "true")).lower()
if success == "true":
s.success_count += 1
else:
s.failure_count += 1
err = event.attributes.get(conv.TOOL_ERROR, "")
if err:
s.errors.append(str(err))
s.total_duration_ms += _float(event.attributes.get(conv.TOOL_DURATION_MS, 0))
s.total_result_size_bytes += _int(
event.attributes.get(conv.TOOL_RESULT_SIZE_BYTES, 0)
)
return summaries
def api_performance(self) -> Dict[str, ApiPerformanceSummary]:
summaries: Dict[str, ApiPerformanceSummary] = {}
for event in self._api_requests:
model = str(event.attributes.get(conv.API_MODEL, "unknown"))
if model not in summaries:
summaries[model] = ApiPerformanceSummary(model=model)
s = summaries[model]
s.request_count += 1
s.total_duration_ms += _float(event.attributes.get(conv.API_DURATION_MS, 0))
s.total_cost_usd += _float(event.attributes.get(conv.API_COST_USD, 0))
speed = event.attributes.get(conv.API_SPEED)
if speed is not None:
s.speeds.append(_float(speed))
for event in self._api_errors:
model = str(event.attributes.get(conv.API_ERROR_MODEL, "unknown"))
if model not in summaries:
summaries[model] = ApiPerformanceSummary(model=model)
s = summaries[model]
s.error_count += 1
s.total_duration_ms += _float(event.attributes.get(conv.API_ERROR_DURATION_MS, 0))
code = str(event.attributes.get(conv.API_ERROR_STATUS_CODE, "unknown"))
s.status_code_counts[code] += 1
return summaries
def cache_efficiency(self) -> Dict[str, CacheEfficiencySummary]:
summaries: Dict[str, CacheEfficiencySummary] = {}
for event in self._api_requests:
model = str(event.attributes.get(conv.API_MODEL, "unknown"))
if model not in summaries:
summaries[model] = CacheEfficiencySummary(model=model)
s = summaries[model]
s.request_count += 1
s.total_cache_read_tokens += _int(
event.attributes.get(conv.API_CACHE_READ_TOKENS, 0)
)
s.total_cache_creation_tokens += _int(
event.attributes.get(conv.API_CACHE_CREATION_TOKENS, 0)
)
return summaries
def _aggregate_cost(self, key_fn) -> Dict[str, CostSummary]:
summaries: Dict[str, CostSummary] = {}
for event in self._api_requests:
key = key_fn(event)
if key not in summaries:
summaries[key] = CostSummary()
s = summaries[key]
s.request_count += 1
cost = _float(event.attributes.get(conv.API_COST_USD, 0))
s.total_cost_usd += cost
s.total_input_tokens += _int(event.attributes.get(conv.API_INPUT_TOKENS, 0))
s.total_output_tokens += _int(event.attributes.get(conv.API_OUTPUT_TOKENS, 0))
model = str(event.attributes.get(conv.API_MODEL, "unknown"))
s.models[model] += cost
return summaries
def _float(v: Any) -> float:
try:
return float(v)
except (TypeError, ValueError):
return 0.0
def _int(v: Any) -> int:
try:
return int(v)
except (TypeError, ValueError):
return 0