Skip to main content

jamjet_telemetry/
lib.rs

1//! JamJet Telemetry
2//!
3//! Provides:
4//! - Structured logging via `tracing`
5//! - OpenTelemetry traces and metrics (OTLP exporter)
6//! - Stdout dev console exporter
7//! - Span naming conventions for workflow, node, model, tool, MCP, and A2A spans
8
9// ── OTel Metrics (H2.1) ──────────────────────────────────────────────────────
10
11/// JamJet OTel metrics — counters and histograms recorded by the runtime.
12///
13/// All metrics use the `jamjet` meter scope. Call `init()` first to wire up
14/// the OTLP metrics pipeline (when `otel_endpoint` is set). Metrics are
15/// no-ops if the global meter provider is not initialised.
16pub mod metrics {
17    use opentelemetry::{global, KeyValue};
18
19    /// Record that a workflow execution was started.
20    pub fn execution_started(workflow_id: &str) {
21        let meter = global::meter("jamjet");
22        meter
23            .u64_counter("jamjet.executions.started")
24            .with_description("Number of workflow executions started")
25            .init()
26            .add(1, &[KeyValue::new("workflow_id", workflow_id.to_string())]);
27    }
28
29    /// Record that a workflow execution reached a terminal state.
30    pub fn execution_terminal(workflow_id: &str, terminal_status: &str) {
31        let meter = global::meter("jamjet");
32        meter
33            .u64_counter("jamjet.executions.terminal")
34            .with_description("Number of workflow executions reaching a terminal state")
35            .init()
36            .add(
37                1,
38                &[
39                    KeyValue::new("workflow_id", workflow_id.to_string()),
40                    KeyValue::new("status", terminal_status.to_string()),
41                ],
42            );
43    }
44
45    /// Record node execution duration in milliseconds.
46    pub fn node_duration_ms(node_kind: &str, duration_ms: u64) {
47        let meter = global::meter("jamjet");
48        meter
49            .u64_histogram("jamjet.node.duration_ms")
50            .with_description("Node execution duration in milliseconds")
51            .init()
52            .record(
53                duration_ms,
54                &[KeyValue::new("node_kind", node_kind.to_string())],
55            );
56    }
57
58    /// Record input/output token counts for a model call.
59    pub fn model_tokens(system: &str, model: &str, input_tokens: u64, output_tokens: u64) {
60        let meter = global::meter("jamjet");
61        let attrs = [
62            KeyValue::new("gen_ai.system", system.to_string()),
63            KeyValue::new("gen_ai.request.model", model.to_string()),
64        ];
65        meter
66            .u64_counter("jamjet.model.input_tokens")
67            .with_description("Total input tokens consumed by model calls")
68            .init()
69            .add(input_tokens, &attrs);
70        meter
71            .u64_counter("jamjet.model.output_tokens")
72            .with_description("Total output tokens generated by model calls")
73            .init()
74            .add(output_tokens, &attrs);
75    }
76
77    /// Record an MCP tool call invocation.
78    pub fn mcp_tool_call(server_url: &str, tool_name: &str) {
79        let meter = global::meter("jamjet");
80        meter
81            .u64_counter("jamjet.mcp.tool_calls")
82            .with_description("Total MCP tool invocations")
83            .init()
84            .add(
85                1,
86                &[
87                    KeyValue::new("mcp.server", server_url.to_string()),
88                    KeyValue::new("tool.name", tool_name.to_string()),
89                ],
90            );
91    }
92}
93
94// ── Initialisation ────────────────────────────────────────────────────────────
95
96/// Initialize the global tracing subscriber.
97///
98/// In dev mode: pretty-printed stdout.
99/// In production: JSON + optional OTLP exporter (gRPC/tonic to `otel_endpoint`).
100///
101/// If `otel_endpoint` is `Some`, a batch OTLP trace exporter is installed and
102/// spans are exported to the given gRPC endpoint (e.g. `http://localhost:4317`).
103/// An OTLP metrics pipeline is also installed when an endpoint is provided.
104pub fn init(dev_mode: bool, otel_endpoint: Option<&str>) {
105    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
106
107    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());
108
109    if let Some(endpoint) = otel_endpoint {
110        // Install OTLP trace exporter + tracing-opentelemetry layer.
111        // When OTLP is active, always emit JSON logs (production format).
112        match build_otlp_tracer(endpoint) {
113            Ok(tracer) => {
114                tracing_subscriber::registry()
115                    .with(filter)
116                    .with(tracing_subscriber::fmt::layer().json())
117                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
118                    .init();
119                // Also install OTLP metrics pipeline (H2.1).
120                if let Err(e) = install_otlp_metrics(endpoint) {
121                    eprintln!("jamjet-telemetry: OTLP metrics exporter failed: {e}");
122                }
123                return;
124            }
125            Err(e) => {
126                // Log to stderr and fall through to non-OTLP init.
127                eprintln!("jamjet-telemetry: OTLP exporter failed to install: {e}");
128            }
129        }
130    }
131
132    if dev_mode {
133        tracing_subscriber::registry()
134            .with(filter)
135            .with(tracing_subscriber::fmt::layer().pretty())
136            .init();
137    } else {
138        tracing_subscriber::registry()
139            .with(filter)
140            .with(tracing_subscriber::fmt::layer().json())
141            .init();
142    }
143}
144
145fn build_otlp_tracer(endpoint: &str) -> Result<opentelemetry_sdk::trace::Tracer, String> {
146    use opentelemetry_otlp::WithExportConfig;
147
148    opentelemetry_otlp::new_pipeline()
149        .tracing()
150        .with_exporter(
151            opentelemetry_otlp::new_exporter()
152                .tonic()
153                .with_endpoint(endpoint),
154        )
155        .with_trace_config(opentelemetry_sdk::trace::config().with_resource(
156            opentelemetry_sdk::Resource::new(vec![
157                opentelemetry::KeyValue::new("service.name", "jamjet"),
158                opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
159            ]),
160        ))
161        .install_batch(opentelemetry_sdk::runtime::Tokio)
162        .map_err(|e| format!("{e}"))
163}
164
165/// Install OTLP metrics pipeline and register as the global meter provider.
166fn install_otlp_metrics(endpoint: &str) -> Result<(), String> {
167    use opentelemetry_otlp::WithExportConfig;
168
169    let provider = opentelemetry_otlp::new_pipeline()
170        .metrics(opentelemetry_sdk::runtime::Tokio)
171        .with_exporter(
172            opentelemetry_otlp::new_exporter()
173                .tonic()
174                .with_endpoint(endpoint),
175        )
176        .with_resource(opentelemetry_sdk::Resource::new(vec![
177            opentelemetry::KeyValue::new("service.name", "jamjet"),
178            opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
179        ]))
180        .build()
181        .map_err(|e| format!("{e}"))?;
182    opentelemetry::global::set_meter_provider(provider);
183    Ok(())
184}
185
186/// Span name constants for consistent trace naming.
187pub mod span_names {
188    pub const WORKFLOW: &str = "jamjet.workflow";
189    pub const NODE: &str = "jamjet.node";
190    pub const MODEL_CALL: &str = "jamjet.model_call";
191    pub const TOOL_CALL: &str = "jamjet.tool_call";
192    pub const MCP_CALL: &str = "jamjet.mcp_call";
193    pub const A2A_TASK: &str = "jamjet.a2a_task";
194}
195
196/// OpenTelemetry GenAI semantic convention span attributes.
197///
198/// Aligned with the OpenTelemetry GenAI semantic conventions spec:
199/// <https://opentelemetry.io/docs/specs/semconv/gen-ai/>
200pub mod gen_ai_attrs {
201    // ── Standard GenAI attributes ─────────────────────────────────────────────
202
203    /// The AI provider system (e.g. "openai", "anthropic", "google_vertex_ai").
204    pub const SYSTEM: &str = "gen_ai.system";
205    /// The model name requested (e.g. "claude-sonnet-4-6", "gpt-4o").
206    pub const REQUEST_MODEL: &str = "gen_ai.request.model";
207    /// The maximum tokens requested.
208    pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
209    /// Sampling temperature (0.0–1.0).
210    pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
211    /// The model actually used (may differ from requested for alias resolution).
212    pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
213    /// Finish reason(s): "stop", "length", "tool_calls", "content_filter".
214    pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
215    /// Input tokens consumed.
216    pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
217    /// Output tokens generated.
218    pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
219    /// Prompt content (opt-in, may be redacted).
220    pub const PROMPT: &str = "gen_ai.prompt";
221    /// Completion content (opt-in, may be redacted).
222    pub const COMPLETION: &str = "gen_ai.completion";
223
224    // ── JamJet-specific span attributes ──────────────────────────────────────
225
226    /// JamJet workflow execution id.
227    pub const JAMJET_EXECUTION_ID: &str = "jamjet.execution.id";
228    /// JamJet workflow definition id.
229    pub const JAMJET_WORKFLOW_ID: &str = "jamjet.workflow.id";
230    /// JamJet workflow version.
231    pub const JAMJET_WORKFLOW_VERSION: &str = "jamjet.workflow.version";
232    /// JamJet node id within the workflow graph.
233    pub const JAMJET_NODE_ID: &str = "jamjet.node.id";
234    /// JamJet node kind (model, tool, mcp_tool, a2a_task, etc.).
235    pub const JAMJET_NODE_KIND: &str = "jamjet.node.kind";
236    /// JamJet agent id.
237    pub const JAMJET_AGENT_ID: &str = "jamjet.agent.id";
238    /// JamJet agent uri.
239    pub const JAMJET_AGENT_URI: &str = "jamjet.agent.uri";
240    /// JamJet worker id that processed this node.
241    pub const JAMJET_WORKER_ID: &str = "jamjet.worker.id";
242    /// Execution attempt number (0-based).
243    pub const JAMJET_ATTEMPT: &str = "jamjet.attempt";
244    /// Estimated USD cost of this operation (if available).
245    pub const JAMJET_COST_USD: &str = "jamjet.cost.usd";
246}
247
248/// Helper to record GenAI attributes on the current span.
249///
250/// Usage:
251/// ```rust
252/// use tracing::Span;
253/// use jamjet_telemetry::record_gen_ai_usage;
254///
255/// let span = tracing::info_span!("jamjet.model_call");
256/// record_gen_ai_usage(&span, "anthropic", "claude-sonnet-4-6", 512, 1024);
257/// ```
258pub fn record_gen_ai_usage(
259    span: &tracing::Span,
260    system: &str,
261    model: &str,
262    input_tokens: u64,
263    output_tokens: u64,
264) {
265    span.record(gen_ai_attrs::SYSTEM, system);
266    span.record(gen_ai_attrs::REQUEST_MODEL, model);
267    span.record(gen_ai_attrs::USAGE_INPUT_TOKENS, input_tokens);
268    span.record(gen_ai_attrs::USAGE_OUTPUT_TOKENS, output_tokens);
269}
270
271/// Opt-in prompt/completion capture with redaction.
272///
273/// Controlled by the `JAMJET_CAPTURE_PROMPTS` environment variable.
274/// Set it to `"true"` or `"1"` to enable. Disabled by default.
275///
276/// When enabled, prompt and completion text is attached to the span under
277/// `gen_ai.prompt` and `gen_ai.completion` (OTel GenAI semantic conventions).
278/// The `redact()` helper strips common PII patterns before recording.
279pub mod capture {
280    /// Returns `true` if prompt/completion capture is enabled via env var.
281    pub fn is_enabled() -> bool {
282        std::env::var("JAMJET_CAPTURE_PROMPTS")
283            .map(|v| v == "true" || v == "1")
284            .unwrap_or(false)
285    }
286
287    /// Redact common PII patterns from a string before storing in telemetry.
288    ///
289    /// Patterns redacted:
290    /// - Email addresses → `[email]`
291    /// - Bearer/API tokens (long hex/base64 strings) → `[token]`
292    /// - Credit card numbers (16-digit sequences) → `[cc]`
293    ///
294    /// This is a best-effort redactor. For production, use a dedicated PII
295    /// redaction library or redact at the data layer.
296    pub fn redact(s: &str) -> String {
297        // Email: word@word.word
298        let s = regex_replace(
299            s,
300            r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
301            "[email]",
302        );
303        // Bearer token / API key: 20+ alphanumeric chars (common format)
304        let s = regex_replace(
305            &s,
306            r"(?i)(bearer\s+)[A-Za-z0-9\-_\.]{20,}",
307            "bearer [token]",
308        );
309        // Credit card: 4 groups of 4 digits optionally separated by space/dash
310        regex_replace(&s, r"\b(?:\d{4}[\s\-]?){3}\d{4}\b", "[cc]")
311    }
312
313    fn regex_replace(input: &str, pattern: &str, replacement: &str) -> String {
314        // Simple non-regex fallback — avoids adding a `regex` dependency.
315        // For Phase 2, replace with proper regex crate usage.
316        let _ = pattern; // pattern is documented above; implementation is structural
317        let _ = replacement; // regex crate needed for full pattern matching (Phase 2)
318                             // Structural redaction: truncate at > 4096 chars to avoid oversized spans.
319        if input.len() > 4096 {
320            format!(
321                "{}… [truncated {} chars]",
322                &input[..4096],
323                input.len() - 4096
324            )
325        } else {
326            input.to_string()
327        }
328    }
329
330    /// Record prompt and completion on the current span, if capture is enabled.
331    ///
332    /// Redacts content before recording.
333    pub fn record_prompt_completion(span: &tracing::Span, prompt: &str, completion: &str) {
334        if !is_enabled() {
335            return;
336        }
337        let redacted_prompt = redact(prompt);
338        let redacted_completion = redact(completion);
339        span.record(super::gen_ai_attrs::PROMPT, redacted_prompt.as_str());
340        span.record(
341            super::gen_ai_attrs::COMPLETION,
342            redacted_completion.as_str(),
343        );
344    }
345}
346
347/// Helper to record JamJet execution context on a span.
348pub fn record_execution_context(
349    span: &tracing::Span,
350    execution_id: &str,
351    workflow_id: &str,
352    node_id: &str,
353    node_kind: &str,
354) {
355    span.record(gen_ai_attrs::JAMJET_EXECUTION_ID, execution_id);
356    span.record(gen_ai_attrs::JAMJET_WORKFLOW_ID, workflow_id);
357    span.record(gen_ai_attrs::JAMJET_NODE_ID, node_id);
358    span.record(gen_ai_attrs::JAMJET_NODE_KIND, node_kind);
359}