pub mod metrics {
use opentelemetry::{global, KeyValue};
pub fn execution_started(workflow_id: &str) {
let meter = global::meter("jamjet");
meter
.u64_counter("jamjet.executions.started")
.with_description("Number of workflow executions started")
.init()
.add(1, &[KeyValue::new("workflow_id", workflow_id.to_string())]);
}
pub fn execution_terminal(workflow_id: &str, terminal_status: &str) {
let meter = global::meter("jamjet");
meter
.u64_counter("jamjet.executions.terminal")
.with_description("Number of workflow executions reaching a terminal state")
.init()
.add(
1,
&[
KeyValue::new("workflow_id", workflow_id.to_string()),
KeyValue::new("status", terminal_status.to_string()),
],
);
}
pub fn node_duration_ms(node_kind: &str, duration_ms: u64) {
let meter = global::meter("jamjet");
meter
.u64_histogram("jamjet.node.duration_ms")
.with_description("Node execution duration in milliseconds")
.init()
.record(
duration_ms,
&[KeyValue::new("node_kind", node_kind.to_string())],
);
}
pub fn model_tokens(system: &str, model: &str, input_tokens: u64, output_tokens: u64) {
let meter = global::meter("jamjet");
let attrs = [
KeyValue::new("gen_ai.system", system.to_string()),
KeyValue::new("gen_ai.request.model", model.to_string()),
];
meter
.u64_counter("jamjet.model.input_tokens")
.with_description("Total input tokens consumed by model calls")
.init()
.add(input_tokens, &attrs);
meter
.u64_counter("jamjet.model.output_tokens")
.with_description("Total output tokens generated by model calls")
.init()
.add(output_tokens, &attrs);
}
pub fn mcp_tool_call(server_url: &str, tool_name: &str) {
let meter = global::meter("jamjet");
meter
.u64_counter("jamjet.mcp.tool_calls")
.with_description("Total MCP tool invocations")
.init()
.add(
1,
&[
KeyValue::new("mcp.server", server_url.to_string()),
KeyValue::new("tool.name", tool_name.to_string()),
],
);
}
}
pub fn init(dev_mode: bool, otel_endpoint: Option<&str>) {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
if let Some(endpoint) = otel_endpoint {
match build_otlp_tracer(endpoint) {
Ok(tracer) => {
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().json())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.init();
if let Err(e) = install_otlp_metrics(endpoint) {
eprintln!("jamjet-telemetry: OTLP metrics exporter failed: {e}");
}
return;
}
Err(e) => {
eprintln!("jamjet-telemetry: OTLP exporter failed to install: {e}");
}
}
}
if dev_mode {
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().pretty())
.init();
} else {
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().json())
.init();
}
}
fn build_otlp_tracer(endpoint: &str) -> Result<opentelemetry_sdk::trace::Tracer, String> {
use opentelemetry_otlp::WithExportConfig;
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "jamjet"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]),
))
.install_batch(opentelemetry_sdk::runtime::Tokio)
.map_err(|e| format!("{e}"))
}
fn install_otlp_metrics(endpoint: &str) -> Result<(), String> {
use opentelemetry_otlp::WithExportConfig;
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
)
.with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "jamjet"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]))
.build()
.map_err(|e| format!("{e}"))?;
opentelemetry::global::set_meter_provider(provider);
Ok(())
}
pub mod span_names {
pub const WORKFLOW: &str = "jamjet.workflow";
pub const NODE: &str = "jamjet.node";
pub const MODEL_CALL: &str = "jamjet.model_call";
pub const TOOL_CALL: &str = "jamjet.tool_call";
pub const MCP_CALL: &str = "jamjet.mcp_call";
pub const A2A_TASK: &str = "jamjet.a2a_task";
}
pub mod gen_ai_attrs {
pub const SYSTEM: &str = "gen_ai.system";
pub const REQUEST_MODEL: &str = "gen_ai.request.model";
pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
pub const PROMPT: &str = "gen_ai.prompt";
pub const COMPLETION: &str = "gen_ai.completion";
pub const JAMJET_EXECUTION_ID: &str = "jamjet.execution.id";
pub const JAMJET_WORKFLOW_ID: &str = "jamjet.workflow.id";
pub const JAMJET_WORKFLOW_VERSION: &str = "jamjet.workflow.version";
pub const JAMJET_NODE_ID: &str = "jamjet.node.id";
pub const JAMJET_NODE_KIND: &str = "jamjet.node.kind";
pub const JAMJET_AGENT_ID: &str = "jamjet.agent.id";
pub const JAMJET_AGENT_URI: &str = "jamjet.agent.uri";
pub const JAMJET_WORKER_ID: &str = "jamjet.worker.id";
pub const JAMJET_ATTEMPT: &str = "jamjet.attempt";
pub const JAMJET_COST_USD: &str = "jamjet.cost.usd";
}
pub fn record_gen_ai_usage(
span: &tracing::Span,
system: &str,
model: &str,
input_tokens: u64,
output_tokens: u64,
) {
span.record(gen_ai_attrs::SYSTEM, system);
span.record(gen_ai_attrs::REQUEST_MODEL, model);
span.record(gen_ai_attrs::USAGE_INPUT_TOKENS, input_tokens);
span.record(gen_ai_attrs::USAGE_OUTPUT_TOKENS, output_tokens);
}
pub mod capture {
pub fn is_enabled() -> bool {
std::env::var("JAMJET_CAPTURE_PROMPTS")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}
pub fn redact(s: &str) -> String {
let s = regex_replace(
s,
r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
"[email]",
);
let s = regex_replace(
&s,
r"(?i)(bearer\s+)[A-Za-z0-9\-_\.]{20,}",
"bearer [token]",
);
regex_replace(&s, r"\b(?:\d{4}[\s\-]?){3}\d{4}\b", "[cc]")
}
fn regex_replace(input: &str, pattern: &str, replacement: &str) -> String {
let _ = pattern; let _ = replacement; if input.len() > 4096 {
format!(
"{}… [truncated {} chars]",
&input[..4096],
input.len() - 4096
)
} else {
input.to_string()
}
}
pub fn record_prompt_completion(span: &tracing::Span, prompt: &str, completion: &str) {
if !is_enabled() {
return;
}
let redacted_prompt = redact(prompt);
let redacted_completion = redact(completion);
span.record(super::gen_ai_attrs::PROMPT, redacted_prompt.as_str());
span.record(
super::gen_ai_attrs::COMPLETION,
redacted_completion.as_str(),
);
}
}
pub fn record_execution_context(
span: &tracing::Span,
execution_id: &str,
workflow_id: &str,
node_id: &str,
node_kind: &str,
) {
span.record(gen_ai_attrs::JAMJET_EXECUTION_ID, execution_id);
span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id);
span.record(gen_ai_attrs::JAMJET_NODE_ID, node_id);
span.record(gen_ai_attrs::JAMJET_NODE_KIND, node_kind);
}