Skip to main content

greentic_runner_host/
telemetry.rs

1use greentic_types::telemetry::{attr_keys, set_current_tenant_ctx};
2use greentic_types::{EnvId, TenantCtx, TenantId};
3use rand::{RngExt, rng};
4use std::str::FromStr;
5use tracing::Span;
6
7pub const PROVIDER_ID: &str = "greentic-runner";
8
9#[derive(Debug, Clone)]
10pub struct FlowSpanAttributes<'a> {
11    pub tenant: &'a str,
12    pub flow_id: &'a str,
13    pub node_id: Option<&'a str>,
14    pub tool: Option<&'a str>,
15    pub action: Option<&'a str>,
16}
17
18pub fn annotate_span(span: &Span, attrs: &FlowSpanAttributes<'_>) {
19    span.record("tenant", attrs.tenant);
20    span.record("flow_id", attrs.flow_id);
21    if let Some(node) = attrs.node_id {
22        span.record("node_id", node);
23    }
24    if let Some(tool) = attrs.tool {
25        span.record("tool", tool);
26    }
27    if let Some(action) = attrs.action {
28        span.record("action", action);
29    }
30}
31
32pub fn tenant_context(
33    env: &str,
34    tenant: &str,
35    flow_id: Option<&str>,
36    node_id: Option<&str>,
37    provider_id: Option<&str>,
38    session_id: Option<&str>,
39) -> TenantCtx {
40    let env_id = EnvId::from_str(env).expect("invalid env id");
41    let tenant_id = TenantId::from_str(tenant).expect("invalid tenant id");
42    let mut ctx = TenantCtx::new(env_id, tenant_id);
43    let provider = provider_id.unwrap_or(PROVIDER_ID);
44    ctx = ctx.with_provider(provider.to_string());
45    if let Some(flow) = flow_id {
46        ctx = ctx.with_flow(flow.to_string());
47    }
48    if let Some(node) = node_id {
49        ctx = ctx.with_node(node.to_string());
50    }
51    if let Some(session) = session_id {
52        ctx = ctx.with_session(session.to_string());
53    }
54    ctx
55}
56
57/// Build the per-invocation tenant context for a flow execution, stamping the
58/// resolved `pack_id` and any rollout identifiers onto `attributes`.
59///
60/// `pack_id` is always live (the engine knows it per invocation). The rollout
61/// IDs come from the engine's owning revision-keyed runtime and are empty until
62/// the Phase-D revision dispatcher constructs revision runtimes. Pure so the
63/// stamping can be unit-tested without the task-local slot.
64#[allow(clippy::too_many_arguments)]
65fn flow_tenant_ctx(
66    env: &str,
67    tenant: &str,
68    flow_id: &str,
69    node_id: Option<&str>,
70    provider_id: Option<&str>,
71    session_id: Option<&str>,
72    pack_id: &str,
73    rollout: &RolloutIds,
74) -> TenantCtx {
75    let mut ctx = tenant_context(env, tenant, Some(flow_id), node_id, provider_id, session_id);
76    if !pack_id.is_empty() {
77        ctx.attributes
78            .insert(attr_keys::PACK_ID.to_string(), pack_id.to_string());
79    }
80    stamp_rollout_ids(&mut ctx, rollout);
81    ctx
82}
83
84/// Install per-invocation telemetry for `span`: stamp the tenant context (live
85/// `pack_id` + rollout IDs) into the task-local slot, and export the `gt.*`
86/// attribution as real OpenTelemetry attributes on `span`.
87///
88/// The export step is load-bearing: greentic-telemetry's `ContextLayer` only
89/// stashes the task-local context in span extensions for capture layers — it
90/// does NOT record `gt.*` onto exported spans (`Span::record` is a no-op for
91/// callsite-unknown fields). The supported export primitive is
92/// `greentic_telemetry::annotate_span`, which sets the attributes directly on
93/// the owned span handle; without it the IDs never reach exported telemetry.
94#[allow(clippy::too_many_arguments)]
95pub fn set_flow_context(
96    span: &Span,
97    env: &str,
98    tenant: &str,
99    flow_id: &str,
100    node_id: Option<&str>,
101    provider_id: Option<&str>,
102    session_id: Option<&str>,
103    pack_id: &str,
104    rollout: &RolloutIds,
105) {
106    let ctx = flow_tenant_ctx(
107        env,
108        tenant,
109        flow_id,
110        node_id,
111        provider_id,
112        session_id,
113        pack_id,
114        rollout,
115    );
116    set_current_tenant_ctx(&ctx);
117    #[cfg(feature = "telemetry")]
118    greentic_telemetry::annotate_span(
119        span,
120        &greentic_types::telemetry::tenant_ctx_to_telemetry(&ctx),
121    );
122    #[cfg(not(feature = "telemetry"))]
123    let _ = span;
124}
125
126/// Deploy-spec rollout identifiers stamped onto the per-invocation
127/// [`TenantCtx`] for telemetry attribution (B11). All optional — the producer
128/// (the revision dispatcher resolving a deployment/revision) is Phase D, so
129/// today these are `None` and [`stamp_rollout_ids`] is a no-op.
130#[derive(Debug, Clone, Default, PartialEq, Eq)]
131pub struct RolloutIds {
132    pub customer_id: Option<String>,
133    pub deployment_id: Option<String>,
134    pub bundle_id: Option<String>,
135    pub revision_id: Option<String>,
136}
137
138impl RolloutIds {
139    /// True when no identifier is set (the common case until Phase D wires the
140    /// dispatcher producer).
141    pub fn is_empty(&self) -> bool {
142        self.customer_id.is_none()
143            && self.deployment_id.is_none()
144            && self.bundle_id.is_none()
145            && self.revision_id.is_none()
146    }
147}
148
149/// Stamp the rollout IDs onto `ctx.attributes` under the canonical
150/// [`attr_keys`](greentic_types::telemetry::attr_keys), so the telemetry bridge
151/// (`set_current_tenant_ctx`) copies them into `TelemetryCtx` for spans/logs.
152///
153/// Authoritative over these four keys: a present ID is written, an absent one
154/// is cleared. Stamping is therefore safe to re-run on a reused `TenantCtx`
155/// (e.g. a session that migrates between revisions) — an ID dropped on a
156/// re-stamp won't linger as a stale attribute from an earlier stamp.
157pub fn stamp_rollout_ids(ctx: &mut TenantCtx, ids: &RolloutIds) {
158    set_or_clear(ctx, attr_keys::CUSTOMER_ID, ids.customer_id.as_deref());
159    set_or_clear(ctx, attr_keys::DEPLOYMENT_ID, ids.deployment_id.as_deref());
160    set_or_clear(ctx, attr_keys::BUNDLE_ID, ids.bundle_id.as_deref());
161    set_or_clear(ctx, attr_keys::REVISION_ID, ids.revision_id.as_deref());
162}
163
164fn set_or_clear(ctx: &mut TenantCtx, key: &str, value: Option<&str>) {
165    match value {
166        Some(v) => {
167            ctx.attributes.insert(key.to_string(), v.to_string());
168        }
169        None => {
170            ctx.attributes.remove(key);
171        }
172    }
173}
174
175pub fn backoff_delay_ms(base: u64, attempt: u32) -> u64 {
176    let multiplier = 1_u64 << attempt.min(10);
177    let exp = base.saturating_mul(multiplier);
178    let mut rng = rng();
179    let jitter = rng.random_range(0..=exp.min(1000));
180    exp + jitter
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    fn ctx() -> TenantCtx {
188        tenant_context("prod-eu", "acme", None, None, None, None)
189    }
190
191    #[test]
192    fn stamp_sets_present_ids_under_canonical_keys() {
193        let mut c = ctx();
194        let ids = RolloutIds {
195            customer_id: Some("cust-acme".into()),
196            deployment_id: Some("01JTKS".into()),
197            bundle_id: Some("customer.support".into()),
198            revision_id: Some("01JTKR".into()),
199        };
200        stamp_rollout_ids(&mut c, &ids);
201        assert_eq!(
202            c.attributes.get(attr_keys::CUSTOMER_ID).map(String::as_str),
203            Some("cust-acme")
204        );
205        assert_eq!(
206            c.attributes
207                .get(attr_keys::DEPLOYMENT_ID)
208                .map(String::as_str),
209            Some("01JTKS")
210        );
211        assert_eq!(
212            c.attributes.get(attr_keys::BUNDLE_ID).map(String::as_str),
213            Some("customer.support")
214        );
215        assert_eq!(
216            c.attributes.get(attr_keys::REVISION_ID).map(String::as_str),
217            Some("01JTKR")
218        );
219    }
220
221    #[test]
222    fn stamp_empty_is_noop() {
223        let mut c = ctx();
224        let before = c.attributes.len();
225        stamp_rollout_ids(&mut c, &RolloutIds::default());
226        assert_eq!(c.attributes.len(), before);
227        assert!(RolloutIds::default().is_empty());
228    }
229
230    #[test]
231    fn stamp_only_sets_present_subset() {
232        let mut c = ctx();
233        stamp_rollout_ids(
234            &mut c,
235            &RolloutIds {
236                deployment_id: Some("01JTKS".into()),
237                ..Default::default()
238            },
239        );
240        assert!(c.attributes.contains_key(attr_keys::DEPLOYMENT_ID));
241        assert!(!c.attributes.contains_key(attr_keys::CUSTOMER_ID));
242    }
243
244    #[test]
245    fn flow_ctx_stamps_pack_id_live() {
246        let ctx = flow_tenant_ctx(
247            "prod-eu",
248            "acme",
249            "support",
250            None,
251            None,
252            None,
253            "customer.support@1.2.0",
254            &RolloutIds::default(),
255        );
256        assert_eq!(
257            ctx.attributes.get(attr_keys::PACK_ID).map(String::as_str),
258            Some("customer.support@1.2.0")
259        );
260        // No revision runtime today → rollout IDs absent.
261        assert!(!ctx.attributes.contains_key(attr_keys::REVISION_ID));
262    }
263
264    #[test]
265    fn flow_ctx_stamps_pack_id_and_rollout_ids() {
266        let ctx = flow_tenant_ctx(
267            "prod-eu",
268            "acme",
269            "support",
270            None,
271            None,
272            None,
273            "customer.support@1.2.0",
274            &RolloutIds {
275                customer_id: Some("cust-acme".into()),
276                deployment_id: Some("01JTKS".into()),
277                bundle_id: Some("customer.support".into()),
278                revision_id: Some("01JTKR".into()),
279            },
280        );
281        assert_eq!(
282            ctx.attributes.get(attr_keys::PACK_ID).map(String::as_str),
283            Some("customer.support@1.2.0")
284        );
285        assert_eq!(
286            ctx.attributes
287                .get(attr_keys::REVISION_ID)
288                .map(String::as_str),
289            Some("01JTKR")
290        );
291        assert_eq!(
292            ctx.attributes
293                .get(attr_keys::DEPLOYMENT_ID)
294                .map(String::as_str),
295            Some("01JTKS")
296        );
297    }
298
299    #[test]
300    fn flow_ctx_skips_empty_pack_id() {
301        let ctx = flow_tenant_ctx(
302            "prod-eu",
303            "acme",
304            "support",
305            None,
306            None,
307            None,
308            "",
309            &RolloutIds::default(),
310        );
311        assert!(!ctx.attributes.contains_key(attr_keys::PACK_ID));
312    }
313
314    #[test]
315    fn stamp_clears_stale_ids_on_restamp() {
316        let mut c = ctx();
317        stamp_rollout_ids(
318            &mut c,
319            &RolloutIds {
320                customer_id: Some("cust-acme".into()),
321                deployment_id: Some("01JTKS".into()),
322                bundle_id: Some("customer.support".into()),
323                revision_id: Some("01JTKR".into()),
324            },
325        );
326        // Re-stamp with only a new revision (e.g. a session migrating revisions):
327        // the other three IDs must be cleared, not left stale from the first stamp.
328        stamp_rollout_ids(
329            &mut c,
330            &RolloutIds {
331                revision_id: Some("01JTKZ".into()),
332                ..Default::default()
333            },
334        );
335        assert_eq!(
336            c.attributes.get(attr_keys::REVISION_ID).map(String::as_str),
337            Some("01JTKZ")
338        );
339        assert!(!c.attributes.contains_key(attr_keys::CUSTOMER_ID));
340        assert!(!c.attributes.contains_key(attr_keys::DEPLOYMENT_ID));
341        assert!(!c.attributes.contains_key(attr_keys::BUNDLE_ID));
342    }
343}
344
345/// End-to-end export regression (C5.4): proves the stamped `gt.*` attribution
346/// actually reaches an **exported** OTLP span — not just the task-local slot.
347/// Guards against the failure mode where `set_current_tenant_ctx` is called but
348/// no `annotate_span` export happens, so production spans omit pack/rollout
349/// attribution while pure-context tests still pass.
350#[cfg(all(test, feature = "telemetry"))]
351mod export_tests {
352    use super::*;
353    use opentelemetry::trace::TracerProvider as _;
354    use opentelemetry_sdk::error::OTelSdkResult;
355    use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, SpanExporter};
356    use std::sync::{Arc, Mutex};
357    use tracing::subscriber;
358    use tracing_subscriber::prelude::*;
359
360    #[derive(Clone, Debug)]
361    struct TestExporter {
362        spans: Arc<Mutex<Vec<SpanData>>>,
363    }
364
365    impl SpanExporter for TestExporter {
366        fn export(
367            &self,
368            batch: Vec<SpanData>,
369        ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
370            let spans = self.spans.clone();
371            async move {
372                spans
373                    .lock()
374                    .unwrap_or_else(|e| e.into_inner())
375                    .extend(batch);
376                Ok(())
377            }
378        }
379    }
380
381    fn attr_value(span: &SpanData, key: &str) -> Option<String> {
382        span.attributes
383            .iter()
384            .find(|kv| kv.key.as_str() == key)
385            .map(|kv| kv.value.to_string())
386    }
387
388    #[test]
389    fn set_flow_context_exports_pack_id_and_rollout_ids() {
390        let exported = Arc::new(Mutex::new(Vec::new()));
391        let provider = SdkTracerProvider::builder()
392            .with_simple_exporter(TestExporter {
393                spans: exported.clone(),
394            })
395            .build();
396        let tracer = provider.tracer("c5.4-flow-export");
397        let subscriber =
398            tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
399        let _guard = subscriber::set_default(subscriber);
400
401        // A span whose callsite declares NONE of the gt.* fields — exactly the
402        // `flow.execute` situation. `set_flow_context` must export them anyway.
403        let span = tracing::info_span!("flow.execute");
404        set_flow_context(
405            &span,
406            "prod-eu",
407            "acme",
408            "support",
409            None,
410            None,
411            Some("sess-1"),
412            "customer.support@1.2.0",
413            &RolloutIds {
414                customer_id: Some("cust-acme".into()),
415                deployment_id: Some("01JTKS".into()),
416                bundle_id: Some("customer.support".into()),
417                revision_id: Some("01JTKR".into()),
418            },
419        );
420        {
421            let _enter = span.enter();
422        }
423        drop(span);
424
425        let _ = provider.force_flush();
426        let _ = provider.shutdown();
427
428        let spans = exported.lock().unwrap_or_else(|e| e.into_inner());
429        // Assert on the named span, not just the last one — a future extra span
430        // from the subscriber stack would otherwise yield confusing failures.
431        let span = spans
432            .iter()
433            .find(|s| s.name == "flow.execute")
434            .expect("flow.execute span exported");
435        assert_eq!(
436            attr_value(span, attr_keys::PACK_ID).as_deref(),
437            Some("customer.support@1.2.0"),
438            "pack_id must reach the exported span"
439        );
440        assert_eq!(
441            attr_value(span, attr_keys::REVISION_ID).as_deref(),
442            Some("01JTKR")
443        );
444        assert_eq!(
445            attr_value(span, attr_keys::DEPLOYMENT_ID).as_deref(),
446            Some("01JTKS")
447        );
448        assert_eq!(
449            attr_value(span, attr_keys::BUNDLE_ID).as_deref(),
450            Some("customer.support")
451        );
452        assert_eq!(
453            attr_value(span, attr_keys::CUSTOMER_ID).as_deref(),
454            Some("cust-acme")
455        );
456    }
457}