1pub mod metrics {
17 use opentelemetry::{global, KeyValue};
18
19 pub fn execution_started(workflow_id: &str) {
21 let meter = global::meter("jamjet");
22 meter
23 .u64_counter("jamjet.executions.started")
24 .with_description("Number of workflow executions started")
25 .init()
26 .add(1, &[KeyValue::new("workflow_id", workflow_id.to_string())]);
27 }
28
29 pub fn execution_terminal(workflow_id: &str, terminal_status: &str) {
31 let meter = global::meter("jamjet");
32 meter
33 .u64_counter("jamjet.executions.terminal")
34 .with_description("Number of workflow executions reaching a terminal state")
35 .init()
36 .add(
37 1,
38 &[
39 KeyValue::new("workflow_id", workflow_id.to_string()),
40 KeyValue::new("status", terminal_status.to_string()),
41 ],
42 );
43 }
44
45 pub fn node_duration_ms(node_kind: &str, duration_ms: u64) {
47 let meter = global::meter("jamjet");
48 meter
49 .u64_histogram("jamjet.node.duration_ms")
50 .with_description("Node execution duration in milliseconds")
51 .init()
52 .record(
53 duration_ms,
54 &[KeyValue::new("node_kind", node_kind.to_string())],
55 );
56 }
57
58 pub fn model_tokens(system: &str, model: &str, input_tokens: u64, output_tokens: u64) {
60 let meter = global::meter("jamjet");
61 let attrs = [
62 KeyValue::new("gen_ai.system", system.to_string()),
63 KeyValue::new("gen_ai.request.model", model.to_string()),
64 ];
65 meter
66 .u64_counter("jamjet.model.input_tokens")
67 .with_description("Total input tokens consumed by model calls")
68 .init()
69 .add(input_tokens, &attrs);
70 meter
71 .u64_counter("jamjet.model.output_tokens")
72 .with_description("Total output tokens generated by model calls")
73 .init()
74 .add(output_tokens, &attrs);
75 }
76
77 pub fn mcp_tool_call(server_url: &str, tool_name: &str) {
79 let meter = global::meter("jamjet");
80 meter
81 .u64_counter("jamjet.mcp.tool_calls")
82 .with_description("Total MCP tool invocations")
83 .init()
84 .add(
85 1,
86 &[
87 KeyValue::new("mcp.server", server_url.to_string()),
88 KeyValue::new("tool.name", tool_name.to_string()),
89 ],
90 );
91 }
92}
93
94pub fn init(dev_mode: bool, otel_endpoint: Option<&str>) {
105 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
106
107 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
108
109 if let Some(endpoint) = otel_endpoint {
110 match build_otlp_tracer(endpoint) {
113 Ok(tracer) => {
114 tracing_subscriber::registry()
115 .with(filter)
116 .with(tracing_subscriber::fmt::layer().json())
117 .with(tracing_opentelemetry::layer().with_tracer(tracer))
118 .init();
119 if let Err(e) = install_otlp_metrics(endpoint) {
121 eprintln!("jamjet-telemetry: OTLP metrics exporter failed: {e}");
122 }
123 return;
124 }
125 Err(e) => {
126 eprintln!("jamjet-telemetry: OTLP exporter failed to install: {e}");
128 }
129 }
130 }
131
132 if dev_mode {
133 tracing_subscriber::registry()
134 .with(filter)
135 .with(tracing_subscriber::fmt::layer().pretty())
136 .init();
137 } else {
138 tracing_subscriber::registry()
139 .with(filter)
140 .with(tracing_subscriber::fmt::layer().json())
141 .init();
142 }
143}
144
145fn build_otlp_tracer(endpoint: &str) -> Result<opentelemetry_sdk::trace::Tracer, String> {
146 use opentelemetry_otlp::WithExportConfig;
147
148 opentelemetry_otlp::new_pipeline()
149 .tracing()
150 .with_exporter(
151 opentelemetry_otlp::new_exporter()
152 .tonic()
153 .with_endpoint(endpoint),
154 )
155 .with_trace_config(opentelemetry_sdk::trace::config().with_resource(
156 opentelemetry_sdk::Resource::new(vec![
157 opentelemetry::KeyValue::new("service.name", "jamjet"),
158 opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
159 ]),
160 ))
161 .install_batch(opentelemetry_sdk::runtime::Tokio)
162 .map_err(|e| format!("{e}"))
163}
164
165fn install_otlp_metrics(endpoint: &str) -> Result<(), String> {
167 use opentelemetry_otlp::WithExportConfig;
168
169 let provider = opentelemetry_otlp::new_pipeline()
170 .metrics(opentelemetry_sdk::runtime::Tokio)
171 .with_exporter(
172 opentelemetry_otlp::new_exporter()
173 .tonic()
174 .with_endpoint(endpoint),
175 )
176 .with_resource(opentelemetry_sdk::Resource::new(vec![
177 opentelemetry::KeyValue::new("service.name", "jamjet"),
178 opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
179 ]))
180 .build()
181 .map_err(|e| format!("{e}"))?;
182 opentelemetry::global::set_meter_provider(provider);
183 Ok(())
184}
185
186pub mod span_names {
188 pub const WORKFLOW: &str = "jamjet.workflow";
189 pub const NODE: &str = "jamjet.node";
190 pub const MODEL_CALL: &str = "jamjet.model_call";
191 pub const TOOL_CALL: &str = "jamjet.tool_call";
192 pub const MCP_CALL: &str = "jamjet.mcp_call";
193 pub const A2A_TASK: &str = "jamjet.a2a_task";
194}
195
196pub mod gen_ai_attrs {
201 pub const SYSTEM: &str = "gen_ai.system";
205 pub const REQUEST_MODEL: &str = "gen_ai.request.model";
207 pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
209 pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
211 pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
213 pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
215 pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
217 pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
219 pub const PROMPT: &str = "gen_ai.prompt";
221 pub const COMPLETION: &str = "gen_ai.completion";
223
224 pub const JAMJET_EXECUTION_ID: &str = "jamjet.execution.id";
228 pub const JAMJET_WORKFLOW_ID: &str = "jamjet.workflow.id";
230 pub const JAMJET_WORKFLOW_VERSION: &str = "jamjet.workflow.version";
232 pub const JAMJET_NODE_ID: &str = "jamjet.node.id";
234 pub const JAMJET_NODE_KIND: &str = "jamjet.node.kind";
236 pub const JAMJET_AGENT_ID: &str = "jamjet.agent.id";
238 pub const JAMJET_AGENT_URI: &str = "jamjet.agent.uri";
240 pub const JAMJET_WORKER_ID: &str = "jamjet.worker.id";
242 pub const JAMJET_ATTEMPT: &str = "jamjet.attempt";
244 pub const JAMJET_COST_USD: &str = "jamjet.cost.usd";
246}
247
248pub fn record_gen_ai_usage(
259 span: &tracing::Span,
260 system: &str,
261 model: &str,
262 input_tokens: u64,
263 output_tokens: u64,
264) {
265 span.record(gen_ai_attrs::SYSTEM, system);
266 span.record(gen_ai_attrs::REQUEST_MODEL, model);
267 span.record(gen_ai_attrs::USAGE_INPUT_TOKENS, input_tokens);
268 span.record(gen_ai_attrs::USAGE_OUTPUT_TOKENS, output_tokens);
269}
270
271pub mod capture {
280 pub fn is_enabled() -> bool {
282 std::env::var("JAMJET_CAPTURE_PROMPTS")
283 .map(|v| v == "true" || v == "1")
284 .unwrap_or(false)
285 }
286
287 pub fn redact(s: &str) -> String {
297 let s = regex_replace(
299 s,
300 r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
301 "[email]",
302 );
303 let s = regex_replace(
305 &s,
306 r"(?i)(bearer\s+)[A-Za-z0-9\-_\.]{20,}",
307 "bearer [token]",
308 );
309 regex_replace(&s, r"\b(?:\d{4}[\s\-]?){3}\d{4}\b", "[cc]")
311 }
312
313 fn regex_replace(input: &str, pattern: &str, replacement: &str) -> String {
314 let _ = pattern; let _ = replacement; if input.len() > 4096 {
320 format!(
321 "{}… [truncated {} chars]",
322 &input[..4096],
323 input.len() - 4096
324 )
325 } else {
326 input.to_string()
327 }
328 }
329
330 pub fn record_prompt_completion(span: &tracing::Span, prompt: &str, completion: &str) {
334 if !is_enabled() {
335 return;
336 }
337 let redacted_prompt = redact(prompt);
338 let redacted_completion = redact(completion);
339 span.record(super::gen_ai_attrs::PROMPT, redacted_prompt.as_str());
340 span.record(
341 super::gen_ai_attrs::COMPLETION,
342 redacted_completion.as_str(),
343 );
344 }
345}
346
347pub fn record_execution_context(
349 span: &tracing::Span,
350 execution_id: &str,
351 workflow_id: &str,
352 node_id: &str,
353 node_kind: &str,
354) {
355 span.record(gen_ai_attrs::JAMJET_EXECUTION_ID, execution_id);
356 span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id);
357 span.record(gen_ai_attrs::JAMJET_NODE_ID, node_id);
358 span.record(gen_ai_attrs::JAMJET_NODE_KIND, node_kind);
359}