Skip to main content

jamjet_telemetry/
lib.rs

1//! JamJet Telemetry
2//!
3//! Provides:
4//! - Structured logging via `tracing`
5//! - OpenTelemetry traces and metrics (OTLP exporter)
6//! - Stdout dev console exporter
7//! - Span naming conventions for workflow, node, model, tool, MCP, and A2A spans
8
9// ── OTel Metrics (H2.1) ──────────────────────────────────────────────────────
10
11/// JamJet OTel metrics — counters and histograms recorded by the runtime.
12///
13/// All metrics use the `jamjet` meter scope. Call `init()` first to wire up
14/// the OTLP metrics pipeline (when `otel_endpoint` is set). Metrics are
15/// no-ops if the global meter provider is not initialised.
16pub mod metrics {
17    use crate::gen_ai_attrs;
18    use opentelemetry::{global, KeyValue};
19
20    /// Build the attribute set for a GenAI token-usage data point.
21    /// Pure + deterministic so it can be unit-tested without a meter provider.
22    pub fn gen_ai_metric_attrs(
23        system: &str,
24        request_model: &str,
25        operation: &str,
26        token_type: &str,
27    ) -> [KeyValue; 4] {
28        [
29            KeyValue::new(gen_ai_attrs::SYSTEM, system.to_string()),
30            KeyValue::new(gen_ai_attrs::REQUEST_MODEL, request_model.to_string()),
31            KeyValue::new(gen_ai_attrs::OPERATION_NAME, operation.to_string()),
32            KeyValue::new(gen_ai_attrs::TOKEN_TYPE, token_type.to_string()),
33        ]
34    }
35
36    /// Record the OTel-standard `gen_ai.client.token.usage` histogram.
37    ///
38    /// Records two data points (input and output) distinguished by the
39    /// `gen_ai.token.type` attribute, matching the OpenTelemetry GenAI metrics
40    /// spec. No-op unless a global meter provider is installed (via `init`).
41    pub fn gen_ai_token_usage(
42        system: &str,
43        request_model: &str,
44        operation: &str,
45        input_tokens: u64,
46        output_tokens: u64,
47    ) {
48        let meter = global::meter("jamjet");
49        let histogram = meter
50            .u64_histogram("gen_ai.client.token.usage")
51            .with_description("Number of tokens used by a model call")
52            .init();
53        histogram.record(
54            input_tokens,
55            &gen_ai_metric_attrs(system, request_model, operation, "input"),
56        );
57        histogram.record(
58            output_tokens,
59            &gen_ai_metric_attrs(system, request_model, operation, "output"),
60        );
61    }
62
63    /// Record that a workflow execution was started.
64    pub fn execution_started(workflow_id: &str) {
65        let meter = global::meter("jamjet");
66        meter
67            .u64_counter("jamjet.executions.started")
68            .with_description("Number of workflow executions started")
69            .init()
70            .add(1, &[KeyValue::new("workflow_id", workflow_id.to_string())]);
71    }
72
73    /// Record that a workflow execution reached a terminal state.
74    pub fn execution_terminal(workflow_id: &str, terminal_status: &str) {
75        let meter = global::meter("jamjet");
76        meter
77            .u64_counter("jamjet.executions.terminal")
78            .with_description("Number of workflow executions reaching a terminal state")
79            .init()
80            .add(
81                1,
82                &[
83                    KeyValue::new("workflow_id", workflow_id.to_string()),
84                    KeyValue::new("status", terminal_status.to_string()),
85                ],
86            );
87    }
88
89    /// Record node execution duration in milliseconds.
90    pub fn node_duration_ms(node_kind: &str, duration_ms: u64) {
91        let meter = global::meter("jamjet");
92        meter
93            .u64_histogram("jamjet.node.duration_ms")
94            .with_description("Node execution duration in milliseconds")
95            .init()
96            .record(
97                duration_ms,
98                &[KeyValue::new("node_kind", node_kind.to_string())],
99            );
100    }
101
102    /// Record input/output token counts for a model call.
103    pub fn model_tokens(system: &str, model: &str, input_tokens: u64, output_tokens: u64) {
104        let meter = global::meter("jamjet");
105        let attrs = [
106            KeyValue::new("gen_ai.system", system.to_string()),
107            KeyValue::new("gen_ai.request.model", model.to_string()),
108        ];
109        meter
110            .u64_counter("jamjet.model.input_tokens")
111            .with_description("Total input tokens consumed by model calls")
112            .init()
113            .add(input_tokens, &attrs);
114        meter
115            .u64_counter("jamjet.model.output_tokens")
116            .with_description("Total output tokens generated by model calls")
117            .init()
118            .add(output_tokens, &attrs);
119    }
120
121    /// Record an MCP tool call invocation.
122    pub fn mcp_tool_call(server_url: &str, tool_name: &str) {
123        let meter = global::meter("jamjet");
124        meter
125            .u64_counter("jamjet.mcp.tool_calls")
126            .with_description("Total MCP tool invocations")
127            .init()
128            .add(
129                1,
130                &[
131                    KeyValue::new("mcp.server", server_url.to_string()),
132                    KeyValue::new("tool.name", tool_name.to_string()),
133                ],
134            );
135    }
136}
137
138// ── Initialisation ────────────────────────────────────────────────────────────
139
140/// Initialize the global tracing subscriber.
141///
142/// In dev mode: pretty-printed stdout.
143/// In production: JSON + optional OTLP exporter (gRPC/tonic to `otel_endpoint`).
144///
145/// If `otel_endpoint` is `Some`, a batch OTLP trace exporter is installed and
146/// spans are exported to the given gRPC endpoint (e.g. `http://localhost:4317`).
147/// An OTLP metrics pipeline is also installed when an endpoint is provided.
148pub fn init(dev_mode: bool, otel_endpoint: Option<&str>) {
149    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
150
151    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
152
153    if let Some(endpoint) = otel_endpoint {
154        // Install OTLP trace exporter + tracing-opentelemetry layer.
155        // When OTLP is active, always emit JSON logs (production format).
156        match build_otlp_tracer(endpoint) {
157            Ok(tracer) => {
158                tracing_subscriber::registry()
159                    .with(filter)
160                    .with(tracing_subscriber::fmt::layer().json())
161                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
162                    .init();
163                // Also install OTLP metrics pipeline (H2.1).
164                if let Err(e) = install_otlp_metrics(endpoint) {
165                    eprintln!("jamjet-telemetry: OTLP metrics exporter failed: {e}");
166                }
167                return;
168            }
169            Err(e) => {
170                // Log to stderr and fall through to non-OTLP init.
171                eprintln!("jamjet-telemetry: OTLP exporter failed to install: {e}");
172            }
173        }
174    }
175
176    if dev_mode {
177        tracing_subscriber::registry()
178            .with(filter)
179            .with(tracing_subscriber::fmt::layer().pretty())
180            .init();
181    } else {
182        tracing_subscriber::registry()
183            .with(filter)
184            .with(tracing_subscriber::fmt::layer().json())
185            .init();
186    }
187}
188
189fn build_otlp_tracer(endpoint: &str) -> Result<opentelemetry_sdk::trace::Tracer, String> {
190    use opentelemetry_otlp::WithExportConfig;
191
192    opentelemetry_otlp::new_pipeline()
193        .tracing()
194        .with_exporter(
195            opentelemetry_otlp::new_exporter()
196                .tonic()
197                .with_endpoint(endpoint),
198        )
199        .with_trace_config(opentelemetry_sdk::trace::config().with_resource(
200            opentelemetry_sdk::Resource::new(vec![
201                opentelemetry::KeyValue::new("service.name", "jamjet"),
202                opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
203            ]),
204        ))
205        .install_batch(opentelemetry_sdk::runtime::Tokio)
206        .map_err(|e| format!("{e}"))
207}
208
209/// Install OTLP metrics pipeline and register as the global meter provider.
210fn install_otlp_metrics(endpoint: &str) -> Result<(), String> {
211    use opentelemetry_otlp::WithExportConfig;
212
213    let provider = opentelemetry_otlp::new_pipeline()
214        .metrics(opentelemetry_sdk::runtime::Tokio)
215        .with_exporter(
216            opentelemetry_otlp::new_exporter()
217                .tonic()
218                .with_endpoint(endpoint),
219        )
220        .with_resource(opentelemetry_sdk::Resource::new(vec![
221            opentelemetry::KeyValue::new("service.name", "jamjet"),
222            opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
223        ]))
224        .build()
225        .map_err(|e| format!("{e}"))?;
226    opentelemetry::global::set_meter_provider(provider);
227    Ok(())
228}
229
230/// Span name constants for consistent trace naming.
231pub mod span_names {
232    pub const WORKFLOW: &str = "jamjet.workflow";
233    pub const NODE: &str = "jamjet.node";
234    pub const MODEL_CALL: &str = "jamjet.model_call";
235    pub const TOOL_CALL: &str = "jamjet.tool_call";
236    pub const MCP_CALL: &str = "jamjet.mcp_call";
237    pub const A2A_TASK: &str = "jamjet.a2a_task";
238}
239
240/// OpenTelemetry GenAI semantic convention span attributes.
241///
242/// Aligned with the OpenTelemetry GenAI semantic conventions spec:
243/// <https://opentelemetry.io/docs/specs/semconv/gen-ai/>
244pub mod gen_ai_attrs {
245    // ── Standard GenAI attributes ─────────────────────────────────────────────
246
247    /// The AI provider system (e.g. "openai", "anthropic", "google_vertex_ai").
248    pub const SYSTEM: &str = "gen_ai.system";
249    /// The model name requested (e.g. "claude-sonnet-4-6", "gpt-4o").
250    pub const REQUEST_MODEL: &str = "gen_ai.request.model";
251    /// The GenAI operation name (e.g. "chat"). OTel GenAI semconv.
252    pub const OPERATION_NAME: &str = "gen_ai.operation.name";
253    /// The GenAI token type tag for the token-usage metric ("input" | "output").
254    pub const TOKEN_TYPE: &str = "gen_ai.token.type";
255    /// The maximum tokens requested.
256    pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
257    /// Sampling temperature (0.0–1.0).
258    pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
259    /// The model actually used (may differ from requested for alias resolution).
260    pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
261    /// Finish reason(s): "stop", "length", "tool_calls", "content_filter".
262    pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
263    /// Input tokens consumed.
264    pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
265    /// Output tokens generated.
266    pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
267    /// Prompt content (opt-in, may be redacted).
268    pub const PROMPT: &str = "gen_ai.prompt";
269    /// Completion content (opt-in, may be redacted).
270    pub const COMPLETION: &str = "gen_ai.completion";
271
272    // ── JamJet-specific span attributes ──────────────────────────────────────
273
274    /// JamJet workflow execution id.
275    pub const JAMJET_EXECUTION_ID: &str = "jamjet.execution.id";
276    /// JamJet workflow definition id.
277    pub const JAMJET_WORKFLOW_ID: &str = "jamjet.workflow.id";
278    /// JamJet workflow version.
279    pub const JAMJET_WORKFLOW_VERSION: &str = "jamjet.workflow.version";
280    /// JamJet node id within the workflow graph.
281    pub const JAMJET_NODE_ID: &str = "jamjet.node.id";
282    /// JamJet node kind (model, tool, mcp_tool, a2a_task, etc.).
283    pub const JAMJET_NODE_KIND: &str = "jamjet.node.kind";
284    /// JamJet agent id.
285    pub const JAMJET_AGENT_ID: &str = "jamjet.agent.id";
286    /// JamJet agent uri.
287    pub const JAMJET_AGENT_URI: &str = "jamjet.agent.uri";
288    /// JamJet worker id that processed this node.
289    pub const JAMJET_WORKER_ID: &str = "jamjet.worker.id";
290    /// Execution attempt number (0-based).
291    pub const JAMJET_ATTEMPT: &str = "jamjet.attempt";
292    /// Estimated USD cost of this operation (if available).
293    pub const JAMJET_COST_USD: &str = "jamjet.cost.usd";
294}
295
296/// Helper to record GenAI attributes on the current span.
297///
298/// Usage:
299/// ```rust
300/// use tracing::Span;
301/// use jamjet_telemetry::record_gen_ai_usage;
302///
303/// let span = tracing::info_span!("jamjet.model_call");
304/// record_gen_ai_usage(&span, "anthropic", "claude-sonnet-4-6", 512, 1024);
305/// ```
306pub fn record_gen_ai_usage(
307    span: &tracing::Span,
308    system: &str,
309    model: &str,
310    input_tokens: u64,
311    output_tokens: u64,
312) {
313    span.record(gen_ai_attrs::SYSTEM, system);
314    span.record(gen_ai_attrs::REQUEST_MODEL, model);
315    span.record(gen_ai_attrs::USAGE_INPUT_TOKENS, input_tokens);
316    span.record(gen_ai_attrs::USAGE_OUTPUT_TOKENS, output_tokens);
317}
318
319/// Opt-in prompt/completion capture with redaction.
320///
321/// Controlled by the `JAMJET_CAPTURE_PROMPTS` environment variable.
322/// Set it to `"true"` or `"1"` to enable. Disabled by default.
323///
324/// When enabled, prompt and completion text is attached to the span under
325/// `gen_ai.prompt` and `gen_ai.completion` (OTel GenAI semantic conventions).
326/// The `redact()` helper strips common PII patterns before recording.
327pub mod capture {
328    /// Returns `true` if prompt/completion capture is enabled via env var.
329    pub fn is_enabled() -> bool {
330        std::env::var("JAMJET_CAPTURE_PROMPTS")
331            .map(|v| v == "true" || v == "1")
332            .unwrap_or(false)
333    }
334
335    /// Redact common PII patterns from a string before storing in telemetry.
336    ///
337    /// Patterns redacted:
338    /// - Email addresses → `[email]`
339    /// - Bearer/API tokens (long hex/base64 strings) → `[token]`
340    /// - Credit card numbers (16-digit sequences) → `[cc]`
341    ///
342    /// This is a best-effort redactor. For production, use a dedicated PII
343    /// redaction library or redact at the data layer.
344    pub fn redact(s: &str) -> String {
345        // Email: word@word.word
346        let s = regex_replace(
347            s,
348            r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
349            "[email]",
350        );
351        // Bearer token / API key: 20+ alphanumeric chars (common format)
352        let s = regex_replace(
353            &s,
354            r"(?i)(bearer\s+)[A-Za-z0-9\-_\.]{20,}",
355            "bearer [token]",
356        );
357        // Credit card: 4 groups of 4 digits optionally separated by space/dash
358        regex_replace(&s, r"\b(?:\d{4}[\s\-]?){3}\d{4}\b", "[cc]")
359    }
360
361    fn regex_replace(input: &str, pattern: &str, replacement: &str) -> String {
362        // Simple non-regex fallback — avoids adding a `regex` dependency.
363        // For Phase 2, replace with proper regex crate usage.
364        let _ = pattern; // pattern is documented above; implementation is structural
365        let _ = replacement; // regex crate needed for full pattern matching (Phase 2)
366                             // Structural redaction: truncate at > 4096 chars to avoid oversized spans.
367        if input.len() > 4096 {
368            format!(
369                "{}… [truncated {} chars]",
370                &input[..4096],
371                input.len() - 4096
372            )
373        } else {
374            input.to_string()
375        }
376    }
377
378    /// Record prompt and completion on the current span, if capture is enabled.
379    ///
380    /// Redacts content before recording.
381    pub fn record_prompt_completion(span: &tracing::Span, prompt: &str, completion: &str) {
382        if !is_enabled() {
383            return;
384        }
385        let redacted_prompt = redact(prompt);
386        let redacted_completion = redact(completion);
387        span.record(super::gen_ai_attrs::PROMPT, redacted_prompt.as_str());
388        span.record(
389            super::gen_ai_attrs::COMPLETION,
390            redacted_completion.as_str(),
391        );
392    }
393}
394
395/// Helper to record JamJet execution context on a span.
396pub fn record_execution_context(
397    span: &tracing::Span,
398    execution_id: &str,
399    workflow_id: &str,
400    node_id: &str,
401    node_kind: &str,
402) {
403    span.record(gen_ai_attrs::JAMJET_EXECUTION_ID, execution_id);
404    span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id);
405    span.record(gen_ai_attrs::JAMJET_NODE_ID, node_id);
406    span.record(gen_ai_attrs::JAMJET_NODE_KIND, node_kind);
407}
408
409#[cfg(test)]
410mod tests {
411    use super::metrics::gen_ai_metric_attrs;
412
413    #[test]
414    fn gen_ai_metric_attrs_carries_system_model_operation_and_token_type() {
415        let attrs = gen_ai_metric_attrs("anthropic", "claude-sonnet-4-6", "chat", "input");
416        let pairs: Vec<(String, String)> = attrs
417            .iter()
418            .map(|kv| (kv.key.as_str().to_string(), kv.value.to_string()))
419            .collect();
420        assert!(pairs.contains(&("gen_ai.system".into(), "anthropic".into())));
421        assert!(pairs.contains(&("gen_ai.request.model".into(), "claude-sonnet-4-6".into())));
422        assert!(pairs.contains(&("gen_ai.operation.name".into(), "chat".into())));
423        assert!(pairs.contains(&("gen_ai.token.type".into(), "input".into())));
424    }
425}