// @harn-entrypoint-category llm.stdlib
//
// std/llm/tool_telemetry — standardized tool-call spans + built-in sinks.
//
// `with_telemetry` (in std/llm/tool_middleware) routes every tool dispatch
// through one of these sinks. The span shape is the contract that consumers
// (Langfuse, OpenTelemetry, custom dashboards) bind to. Defined here so it
// stays decoupled from the middleware seam and from any specific exporter.
//
// Span shape (also in docs/src/observability/tool-call-spans.md):
//
// {
// name: "tool_call.<tool_name>",
// kind: "tool_call",
// span_id: <tool_call_id>,
// trace_id: <session_id | "tool_call:" + call_id>,
// parent_span_id: nil,
// start_time_ms, end_time_ms, duration_ms,
// start_time_iso, end_time_iso,
// status: "ok"|"error"|"dry_run"|"timeout"|"rate_limited"|
// "consent_denied"|"schema_violation"|"redacted"|...,
// attributes: {
// tool_name, tool_call_id, executor, status, ok,
// args_hash, iteration, session_id, persona?, mode?,
// error?, error_category?, rendered_result_size?,
// "gen_ai.tool.name", "gen_ai.tool.call.id", "gen_ai.tool.executor",
// },
// events: [{name, time_ms, attributes?}, ...],
// child_spans: [{name, status, duration_ms?, started_at?, ended_at?, error?, attributes?}],
// call: <middleware envelope>,
// result: <dispatch-result dict>,
// }
//
// The schema is additive — sinks that don't recognize a field MUST ignore
// it, and new fields can be added without breaking consumers.
import { env_str } from "std/config"
// -------------------------------------------------------------------------------------------------
// Span builder
// -------------------------------------------------------------------------------------------------
fn __tt_dict(value) {
if type_of(value) == "dict" {
return value
}
return {}
}
fn __tt_list(value) {
if type_of(value) == "list" {
return value
}
return []
}
fn __tt_args_hash(tool_args) {
return sha256(json_stringify(tool_args ?? {}))
}
fn __tt_trace_id(session_id, call_id) {
if session_id != "" {
return session_id
}
return "tool_call:" + call_id
}
fn __tt_status(result) {
let status = to_string(result?.status ?? "")
if status != "" {
return status
}
if result?.ok ?? false {
return "ok"
}
return "error"
}
fn __tt_child_spans(audit) {
let layers = __tt_list(audit?.layers)
var spans = []
for layer in layers {
if type_of(layer) != "dict" {
continue
}
var span = {name: "tool_call." + to_string(layer?.name ?? "layer"), status: to_string(layer?.status ?? "ok")}
if layer?.started_at != nil {
span = span + {started_at: layer.started_at}
}
if layer?.ended_at != nil {
span = span + {ended_at: layer.ended_at}
}
if layer?.error != nil {
span = span + {error: to_string(layer.error)}
}
if layer?.elapsed_ms != nil {
span = span + {duration_ms: layer.elapsed_ms}
}
var attrs = {}
if layer?.count != nil {
attrs = attrs + {count: layer.count}
}
if layer?.cap != nil {
attrs = attrs + {cap: layer.cap}
}
if layer?.key != nil {
attrs = attrs + {key: to_string(layer.key)}
}
if layer?.limit_ms != nil {
attrs = attrs + {limit_ms: layer.limit_ms}
}
if len(attrs.keys()) > 0 {
span = span + {attributes: attrs}
}
spans = spans.push(span)
}
return spans
}
fn __tt_events(result, start_ms, end_ms) {
var events = [
{name: "tool_call.dispatched", time_ms: start_ms},
{name: "tool_call.result_returned", time_ms: end_ms},
]
let category = to_string(result?.error_category ?? "")
if category == "scope_violation" || category == "policy_blocked" {
events = events
.push(
{
name: "tool_call.scope_violation",
time_ms: end_ms,
attributes: {error: to_string(result?.error ?? ""), category: category},
},
)
}
return events
}
/**
* Build the standardized tool-call span dict from a middleware envelope,
* its dispatch result, and the wall-clock window the call spanned.
*
* `extras` (optional dict):
* persona: string — caller identity (e.g. "merge_captain")
* mode: string — caller mode (e.g. "preview"/"apply")
* parent_span_id: string — set when nested under an agent_turn span
* trace_id_override: string — replace the default session_id-derived trace id
* extra_attributes: dict — additional attributes merged into `attributes`
* start_time_iso: string — pre-sampled ISO timestamp matching start_ms
* end_time_iso: string — pre-sampled ISO timestamp matching end_ms
*/
pub fn tool_call_span(call, result, start_ms, end_ms, extras = nil) {
let opts = __tt_dict(extras)
let session_id = to_string(call?.turn?.session_id ?? "")
let iteration = call?.turn?.iteration ?? 0
let tool_name = to_string(call?.tool_name ?? "")
let call_id = to_string(call?.call_id ?? "")
let status = __tt_status(result)
let ok = result?.ok ?? false
var attributes = {
tool_name: tool_name,
tool_call_id: call_id,
executor: result?.executor,
status: status,
ok: ok,
args_hash: __tt_args_hash(call?.tool_args),
iteration: iteration,
session_id: session_id,
"gen_ai.tool.name": tool_name,
"gen_ai.tool.call.id": call_id,
"gen_ai.tool.executor": result?.executor,
}
if result?.error != nil {
attributes = attributes + {error: to_string(result.error)}
}
if result?.error_category != nil {
attributes = attributes + {error_category: to_string(result.error_category)}
}
if opts?.persona != nil {
attributes = attributes + {persona: to_string(opts.persona)}
}
if opts?.mode != nil {
attributes = attributes + {mode: to_string(opts.mode)}
}
if type_of(opts?.extra_attributes) == "dict" {
attributes = attributes + opts.extra_attributes
}
let rendered = result?.rendered_result
if type_of(rendered) == "string" {
attributes = attributes + {rendered_result_size: len(rendered)}
}
return {
name: "tool_call." + tool_name,
kind: "tool_call",
span_id: call_id,
trace_id: if opts?.trace_id_override != nil {
to_string(opts.trace_id_override)
} else {
__tt_trace_id(session_id, call_id)
},
parent_span_id: opts?.parent_span_id,
start_time_ms: start_ms,
end_time_ms: end_ms,
duration_ms: end_ms - start_ms,
start_time_iso: opts?.start_time_iso,
end_time_iso: opts?.end_time_iso,
status: status,
attributes: attributes,
events: __tt_events(result, start_ms, end_ms),
child_spans: __tt_child_spans(result?.audit),
call: call,
result: result,
}
}
// -------------------------------------------------------------------------------------------------
// Built-in sinks
// -------------------------------------------------------------------------------------------------
/**
* langfuse_sink(opts?) -> fn(span)
*
* Returns a sink that POSTs the span as an `observation-create` event to
* the Langfuse ingestion API (`/api/public/ingestion`).
*
* Credentials are read from env by default:
* LANGFUSE_BASE_URL e.g. "https://us.cloud.langfuse.com"
* LANGFUSE_PUBLIC_KEY
* LANGFUSE_SECRET_KEY
*
* Options (all optional):
* base_url: override LANGFUSE_BASE_URL
* public_key: override LANGFUSE_PUBLIC_KEY
* secret_key: override LANGFUSE_SECRET_KEY
* project: metadata.project tag on each observation
* environment: metadata.environment tag (dev/staging/prod/…)
* release: metadata.release tag
* timeout_ms: HTTP timeout per POST (default 5000)
* on_error: fn(message, span) — called on missing creds / HTTP failure.
* Errors are otherwise swallowed so telemetry failures don't
* break the agent loop.
*
* Each tool call POSTs synchronously. For high-throughput workloads,
* wrap with `with_idempotency` upstream or batch via a custom sink.
*/
pub fn langfuse_sink(opts = nil) {
let cfg = __tt_dict(opts)
let base_url = __tt_resolve_str(cfg?.base_url, "LANGFUSE_BASE_URL")
let public_key = __tt_resolve_str(cfg?.public_key, "LANGFUSE_PUBLIC_KEY")
let secret_key = __tt_resolve_str(cfg?.secret_key, "LANGFUSE_SECRET_KEY")
let project = cfg?.project
let environment = cfg?.environment
let release = cfg?.release
let timeout_ms = cfg?.timeout_ms ?? 5000
let on_error = cfg?.on_error
return { span ->
if base_url == "" || public_key == "" || secret_key == "" {
__tt_report_error(
on_error,
"langfuse_sink: missing credentials (set LANGFUSE_BASE_URL/PUBLIC_KEY/SECRET_KEY or pass via opts)",
span,
)
return
}
let body = __tt_langfuse_body(span, project, environment, release)
let endpoint = __tt_langfuse_endpoint(base_url)
let auth = "Basic " + base64_encode(public_key + ":" + secret_key)
let response_outcome = try {
http_request(
"POST",
endpoint,
{
headers: {authorization: auth, "content-type": "application/json"},
body: body,
timeout_ms: timeout_ms,
},
)
}
if is_err(response_outcome) {
__tt_report_error(on_error, "langfuse_sink: " + to_string(unwrap_err(response_outcome)), span)
return
}
let response = unwrap(response_outcome)
if !(response?.ok ?? false) {
__tt_report_error(
on_error,
"langfuse_sink HTTP " + to_string(response?.status ?? 0) + ": "
+ to_string(response?.body ?? ""),
span,
)
}
}
}
/**
* otel_sink(opts?) -> fn(span)
*
* Stub sink that formats spans as OpenTelemetry-shaped JSON and writes
* to stderr by default. Real OTLP HTTP export is a follow-up (the
* harn-vm `observability` module already speaks OTLP for orchestrator
* spans — wiring tool-call spans through it goes here when needed).
*
* Options:
* sink: fn(otel_record) — custom emitter; default writes JSON to stderr
* prefix: string prefix on each stderr line (default "[otel] ")
*/
pub fn otel_sink(opts = nil) {
let cfg = __tt_dict(opts)
let custom = cfg?.sink
let prefix = to_string(cfg?.prefix ?? "[otel] ")
return { span ->
let attrs = __tt_dict(span?.attributes)
let record = {
name: span?.name,
kind: "INTERNAL",
span_id: span?.span_id,
trace_id: span?.trace_id,
parent_span_id: span?.parent_span_id,
start_time_unix_nano: __tt_ms_to_nano(span?.start_time_ms ?? 0),
end_time_unix_nano: __tt_ms_to_nano(span?.end_time_ms ?? 0),
attributes: attrs,
events: __tt_otel_events(__tt_list(span?.events)),
status: {
code: if attrs?.ok ?? false {
"OK"
} else {
"ERROR"
},
message: to_string(attrs?.error ?? ""),
},
}
if __tt_is_callable(custom) {
let _ = try {
custom(record)
}
} else {
eprintln(prefix + json_stringify(record))
}
}
}
/**
* stderr_sink(opts?) -> fn(span)
*
* Dev-friendly sink: prints one JSON line per span to stderr. Use during
* development before wiring a real backend; pipe to `jq` for pretty output.
*
* Options:
* prefix: string prefix on each line (default "[tool_telemetry] ")
*/
pub fn stderr_sink(opts = nil) {
let cfg = __tt_dict(opts)
let prefix = to_string(cfg?.prefix ?? "[tool_telemetry] ")
return { span -> eprintln(prefix + json_stringify(span)) }
}
/**
* noop_sink() -> fn(span)
*
* Discards spans. Useful when telemetry is conditionally disabled.
*/
pub fn noop_sink() {
return { _span ->
}
}
// -------------------------------------------------------------------------------------------------
// Sink resolution
// -------------------------------------------------------------------------------------------------
fn __tt_is_callable(value) {
let kind = type_of(value)
return kind == "function" || kind == "closure" || kind == "fn"
}
fn __tt_resolve_str(override_value, env_key) {
if override_value != nil {
return to_string(override_value)
}
let raw = env_str(env_key)
if raw == nil {
return ""
}
return to_string(raw)
}
fn __tt_report_error(on_error, message, span) {
if on_error == nil {
return
}
if !__tt_is_callable(on_error) {
return
}
let _ = try {
on_error(message, span)
}
}
fn __tt_compose_sinks(sinks) {
let resolved = __tt_list(sinks)
if len(resolved) == 0 {
return noop_sink()
}
return { span ->
for entry in resolved {
if __tt_is_callable(entry) {
let _ = try {
entry(span)
}
}
}
}
}
/**
* Resolve a `with_telemetry` config (callable | string | dict) into a
* single callable sink. Exposed so middleware authors can reuse the
* resolver when building their own telemetry wrappers.
*
* Accepted shapes:
* callable — used as-is
* "langfuse"|"otel"|"stderr"|"noop" — built-in sink with env defaults
* {sink: callable | string, ...opts} — single sink with options
* {sinks: [...], ...opts} — fan out to multiple sinks
*/
pub fn resolve_telemetry_sink(value) {
if value == nil {
return noop_sink()
}
if __tt_is_callable(value) {
return value
}
if type_of(value) == "string" {
return __tt_named_sink(value, {})
}
if type_of(value) != "dict" {
throw "tool_telemetry: with_telemetry config must be callable, string, or dict; got " + type_of(value)
}
let cfg = value
if type_of(cfg?.sinks) == "list" {
var resolved = []
for item in cfg.sinks {
resolved = resolved.push(__tt_resolve_one(item, cfg))
}
return __tt_compose_sinks(resolved)
}
return __tt_resolve_one(cfg?.sink ?? cfg, cfg)
}
fn __tt_resolve_one(item, parent_cfg) {
if item == nil {
return noop_sink()
}
if __tt_is_callable(item) {
return item
}
if type_of(item) == "string" {
return __tt_named_sink(item, parent_cfg)
}
if type_of(item) == "dict" {
return resolve_telemetry_sink(item)
}
throw "tool_telemetry: sink entry must be callable, string, or dict; got " + type_of(item)
}
fn __tt_named_sink(name, opts) {
if name == "langfuse" {
return langfuse_sink(opts)
}
if name == "otel" {
return otel_sink(opts)
}
if name == "stderr" {
return stderr_sink(opts)
}
if name == "noop" {
return noop_sink()
}
throw "tool_telemetry: unknown built-in sink '" + name + "'; expected langfuse|otel|stderr|noop"
}
// -------------------------------------------------------------------------------------------------
// Helpers — Langfuse + OTel encoding
// -------------------------------------------------------------------------------------------------
fn __tt_langfuse_endpoint(base_url) {
let trimmed = if ends_with(base_url, "/") {
substring(base_url, 0, len(base_url) - 1)
} else {
base_url
}
return trimmed + "/api/public/ingestion"
}
fn __tt_langfuse_body(span, project, environment, release) {
let attrs = __tt_dict(span?.attributes)
var metadata = {
args_hash: attrs?.args_hash,
iteration: attrs?.iteration,
executor: attrs?.executor,
child_spans: span?.child_spans,
events: span?.events,
}
if project != nil {
metadata = metadata + {project: to_string(project)}
}
if environment != nil {
metadata = metadata + {environment: to_string(environment)}
}
if release != nil {
metadata = metadata + {release: to_string(release)}
}
if attrs?.persona != nil {
metadata = metadata + {persona: attrs.persona}
}
if attrs?.mode != nil {
metadata = metadata + {mode: attrs.mode}
}
let level = if attrs?.ok ?? false {
"DEFAULT"
} else {
"ERROR"
}
let observation_id = to_string(span?.span_id ?? "")
let trace_id = to_string(span?.trace_id ?? "")
let now_iso = date_now_iso()
let start_iso = span?.start_time_iso ?? now_iso
let end_iso = span?.end_time_iso ?? now_iso
return json_stringify(
{
batch: [
{
id: observation_id + ":obs",
type: "observation-create",
timestamp: now_iso,
body: {
id: observation_id,
traceId: trace_id,
type: "SPAN",
name: to_string(span?.name ?? ""),
startTime: start_iso,
endTime: end_iso,
input: span?.call?.tool_args,
output: span?.result?.result,
metadata: metadata,
level: level,
statusMessage: to_string(attrs?.error ?? ""),
},
},
],
},
)
}
fn __tt_ms_to_nano(ms) {
return ms * 1000000
}
fn __tt_otel_events(events) {
var converted = []
for event in events {
if type_of(event) != "dict" {
continue
}
var record = {name: to_string(event?.name ?? ""), time_unix_nano: __tt_ms_to_nano(event?.time_ms ?? 0)}
if type_of(event?.attributes) == "dict" {
record = record + {attributes: event.attributes}
}
converted = converted.push(record)
}
return converted
}