1pub mod metrics {
17 use crate::gen_ai_attrs;
18 use opentelemetry::{global, KeyValue};
19
20 pub fn gen_ai_metric_attrs(
23 system: &str,
24 request_model: &str,
25 operation: &str,
26 token_type: &str,
27 ) -> [KeyValue; 4] {
28 [
29 KeyValue::new(gen_ai_attrs::SYSTEM, system.to_string()),
30 KeyValue::new(gen_ai_attrs::REQUEST_MODEL, request_model.to_string()),
31 KeyValue::new(gen_ai_attrs::OPERATION_NAME, operation.to_string()),
32 KeyValue::new(gen_ai_attrs::TOKEN_TYPE, token_type.to_string()),
33 ]
34 }
35
36 pub fn gen_ai_token_usage(
42 system: &str,
43 request_model: &str,
44 operation: &str,
45 input_tokens: u64,
46 output_tokens: u64,
47 ) {
48 let meter = global::meter("jamjet");
49 let histogram = meter
50 .u64_histogram("gen_ai.client.token.usage")
51 .with_description("Number of tokens used by a model call")
52 .init();
53 histogram.record(
54 input_tokens,
55 &gen_ai_metric_attrs(system, request_model, operation, "input"),
56 );
57 histogram.record(
58 output_tokens,
59 &gen_ai_metric_attrs(system, request_model, operation, "output"),
60 );
61 }
62
63 pub fn execution_started(workflow_id: &str) {
65 let meter = global::meter("jamjet");
66 meter
67 .u64_counter("jamjet.executions.started")
68 .with_description("Number of workflow executions started")
69 .init()
70 .add(1, &[KeyValue::new("workflow_id", workflow_id.to_string())]);
71 }
72
73 pub fn execution_terminal(workflow_id: &str, terminal_status: &str) {
75 let meter = global::meter("jamjet");
76 meter
77 .u64_counter("jamjet.executions.terminal")
78 .with_description("Number of workflow executions reaching a terminal state")
79 .init()
80 .add(
81 1,
82 &[
83 KeyValue::new("workflow_id", workflow_id.to_string()),
84 KeyValue::new("status", terminal_status.to_string()),
85 ],
86 );
87 }
88
89 pub fn node_duration_ms(node_kind: &str, duration_ms: u64) {
91 let meter = global::meter("jamjet");
92 meter
93 .u64_histogram("jamjet.node.duration_ms")
94 .with_description("Node execution duration in milliseconds")
95 .init()
96 .record(
97 duration_ms,
98 &[KeyValue::new("node_kind", node_kind.to_string())],
99 );
100 }
101
102 pub fn model_tokens(system: &str, model: &str, input_tokens: u64, output_tokens: u64) {
104 let meter = global::meter("jamjet");
105 let attrs = [
106 KeyValue::new("gen_ai.system", system.to_string()),
107 KeyValue::new("gen_ai.request.model", model.to_string()),
108 ];
109 meter
110 .u64_counter("jamjet.model.input_tokens")
111 .with_description("Total input tokens consumed by model calls")
112 .init()
113 .add(input_tokens, &attrs);
114 meter
115 .u64_counter("jamjet.model.output_tokens")
116 .with_description("Total output tokens generated by model calls")
117 .init()
118 .add(output_tokens, &attrs);
119 }
120
121 pub fn mcp_tool_call(server_url: &str, tool_name: &str) {
123 let meter = global::meter("jamjet");
124 meter
125 .u64_counter("jamjet.mcp.tool_calls")
126 .with_description("Total MCP tool invocations")
127 .init()
128 .add(
129 1,
130 &[
131 KeyValue::new("mcp.server", server_url.to_string()),
132 KeyValue::new("tool.name", tool_name.to_string()),
133 ],
134 );
135 }
136}
137
138pub fn init(dev_mode: bool, otel_endpoint: Option<&str>) {
149 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
150
151 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
152
153 if let Some(endpoint) = otel_endpoint {
154 match build_otlp_tracer(endpoint) {
157 Ok(tracer) => {
158 tracing_subscriber::registry()
159 .with(filter)
160 .with(tracing_subscriber::fmt::layer().json())
161 .with(tracing_opentelemetry::layer().with_tracer(tracer))
162 .init();
163 if let Err(e) = install_otlp_metrics(endpoint) {
165 eprintln!("jamjet-telemetry: OTLP metrics exporter failed: {e}");
166 }
167 return;
168 }
169 Err(e) => {
170 eprintln!("jamjet-telemetry: OTLP exporter failed to install: {e}");
172 }
173 }
174 }
175
176 if dev_mode {
177 tracing_subscriber::registry()
178 .with(filter)
179 .with(tracing_subscriber::fmt::layer().pretty())
180 .init();
181 } else {
182 tracing_subscriber::registry()
183 .with(filter)
184 .with(tracing_subscriber::fmt::layer().json())
185 .init();
186 }
187}
188
189fn build_otlp_tracer(endpoint: &str) -> Result<opentelemetry_sdk::trace::Tracer, String> {
190 use opentelemetry_otlp::WithExportConfig;
191
192 opentelemetry_otlp::new_pipeline()
193 .tracing()
194 .with_exporter(
195 opentelemetry_otlp::new_exporter()
196 .tonic()
197 .with_endpoint(endpoint),
198 )
199 .with_trace_config(opentelemetry_sdk::trace::config().with_resource(
200 opentelemetry_sdk::Resource::new(vec![
201 opentelemetry::KeyValue::new("service.name", "jamjet"),
202 opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
203 ]),
204 ))
205 .install_batch(opentelemetry_sdk::runtime::Tokio)
206 .map_err(|e| format!("{e}"))
207}
208
209fn install_otlp_metrics(endpoint: &str) -> Result<(), String> {
211 use opentelemetry_otlp::WithExportConfig;
212
213 let provider = opentelemetry_otlp::new_pipeline()
214 .metrics(opentelemetry_sdk::runtime::Tokio)
215 .with_exporter(
216 opentelemetry_otlp::new_exporter()
217 .tonic()
218 .with_endpoint(endpoint),
219 )
220 .with_resource(opentelemetry_sdk::Resource::new(vec![
221 opentelemetry::KeyValue::new("service.name", "jamjet"),
222 opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
223 ]))
224 .build()
225 .map_err(|e| format!("{e}"))?;
226 opentelemetry::global::set_meter_provider(provider);
227 Ok(())
228}
229
230pub mod span_names {
232 pub const WORKFLOW: &str = "jamjet.workflow";
233 pub const NODE: &str = "jamjet.node";
234 pub const MODEL_CALL: &str = "jamjet.model_call";
235 pub const TOOL_CALL: &str = "jamjet.tool_call";
236 pub const MCP_CALL: &str = "jamjet.mcp_call";
237 pub const A2A_TASK: &str = "jamjet.a2a_task";
238}
239
240pub mod gen_ai_attrs {
245 pub const SYSTEM: &str = "gen_ai.system";
249 pub const REQUEST_MODEL: &str = "gen_ai.request.model";
251 pub const OPERATION_NAME: &str = "gen_ai.operation.name";
253 pub const TOKEN_TYPE: &str = "gen_ai.token.type";
255 pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
257 pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
259 pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
261 pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
263 pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
265 pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
267 pub const PROMPT: &str = "gen_ai.prompt";
269 pub const COMPLETION: &str = "gen_ai.completion";
271
272 pub const JAMJET_EXECUTION_ID: &str = "jamjet.execution.id";
276 pub const JAMJET_WORKFLOW_ID: &str = "jamjet.workflow.id";
278 pub const JAMJET_WORKFLOW_VERSION: &str = "jamjet.workflow.version";
280 pub const JAMJET_NODE_ID: &str = "jamjet.node.id";
282 pub const JAMJET_NODE_KIND: &str = "jamjet.node.kind";
284 pub const JAMJET_AGENT_ID: &str = "jamjet.agent.id";
286 pub const JAMJET_AGENT_URI: &str = "jamjet.agent.uri";
288 pub const JAMJET_WORKER_ID: &str = "jamjet.worker.id";
290 pub const JAMJET_ATTEMPT: &str = "jamjet.attempt";
292 pub const JAMJET_COST_USD: &str = "jamjet.cost.usd";
294}
295
296pub fn record_gen_ai_usage(
307 span: &tracing::Span,
308 system: &str,
309 model: &str,
310 input_tokens: u64,
311 output_tokens: u64,
312) {
313 span.record(gen_ai_attrs::SYSTEM, system);
314 span.record(gen_ai_attrs::REQUEST_MODEL, model);
315 span.record(gen_ai_attrs::USAGE_INPUT_TOKENS, input_tokens);
316 span.record(gen_ai_attrs::USAGE_OUTPUT_TOKENS, output_tokens);
317}
318
319pub mod capture {
328 pub fn is_enabled() -> bool {
330 std::env::var("JAMJET_CAPTURE_PROMPTS")
331 .map(|v| v == "true" || v == "1")
332 .unwrap_or(false)
333 }
334
335 pub fn redact(s: &str) -> String {
345 let s = regex_replace(
347 s,
348 r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
349 "[email]",
350 );
351 let s = regex_replace(
353 &s,
354 r"(?i)(bearer\s+)[A-Za-z0-9\-_\.]{20,}",
355 "bearer [token]",
356 );
357 regex_replace(&s, r"\b(?:\d{4}[\s\-]?){3}\d{4}\b", "[cc]")
359 }
360
361 fn regex_replace(input: &str, pattern: &str, replacement: &str) -> String {
362 let _ = pattern; let _ = replacement; if input.len() > 4096 {
368 format!(
369 "{}… [truncated {} chars]",
370 &input[..4096],
371 input.len() - 4096
372 )
373 } else {
374 input.to_string()
375 }
376 }
377
378 pub fn record_prompt_completion(span: &tracing::Span, prompt: &str, completion: &str) {
382 if !is_enabled() {
383 return;
384 }
385 let redacted_prompt = redact(prompt);
386 let redacted_completion = redact(completion);
387 span.record(super::gen_ai_attrs::PROMPT, redacted_prompt.as_str());
388 span.record(
389 super::gen_ai_attrs::COMPLETION,
390 redacted_completion.as_str(),
391 );
392 }
393}
394
395pub fn record_execution_context(
397 span: &tracing::Span,
398 execution_id: &str,
399 workflow_id: &str,
400 node_id: &str,
401 node_kind: &str,
402) {
403 span.record(gen_ai_attrs::JAMJET_EXECUTION_ID, execution_id);
404 span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id);
405 span.record(gen_ai_attrs::JAMJET_NODE_ID, node_id);
406 span.record(gen_ai_attrs::JAMJET_NODE_KIND, node_kind);
407}
408
409#[cfg(test)]
410mod tests {
411 use super::metrics::gen_ai_metric_attrs;
412
413 #[test]
414 fn gen_ai_metric_attrs_carries_system_model_operation_and_token_type() {
415 let attrs = gen_ai_metric_attrs("anthropic", "claude-sonnet-4-6", "chat", "input");
416 let pairs: Vec<(String, String)> = attrs
417 .iter()
418 .map(|kv| (kv.key.as_str().to_string(), kv.value.to_string()))
419 .collect();
420 assert!(pairs.contains(&("gen_ai.system".into(), "anthropic".into())));
421 assert!(pairs.contains(&("gen_ai.request.model".into(), "claude-sonnet-4-6".into())));
422 assert!(pairs.contains(&("gen_ai.operation.name".into(), "chat".into())));
423 assert!(pairs.contains(&("gen_ai.token.type".into(), "input".into())));
424 }
425}