1use greentic_types::telemetry::{attr_keys, set_current_tenant_ctx};
2use greentic_types::{EnvId, TenantCtx, TenantId};
3use rand::{RngExt, rng};
4use std::str::FromStr;
5use tracing::Span;
6
7pub const PROVIDER_ID: &str = "greentic-runner";
8
9#[derive(Debug, Clone)]
10pub struct FlowSpanAttributes<'a> {
11 pub tenant: &'a str,
12 pub flow_id: &'a str,
13 pub node_id: Option<&'a str>,
14 pub tool: Option<&'a str>,
15 pub action: Option<&'a str>,
16}
17
18pub fn annotate_span(span: &Span, attrs: &FlowSpanAttributes<'_>) {
19 span.record("tenant", attrs.tenant);
20 span.record("flow_id", attrs.flow_id);
21 if let Some(node) = attrs.node_id {
22 span.record("node_id", node);
23 }
24 if let Some(tool) = attrs.tool {
25 span.record("tool", tool);
26 }
27 if let Some(action) = attrs.action {
28 span.record("action", action);
29 }
30}
31
32pub fn tenant_context(
33 env: &str,
34 tenant: &str,
35 flow_id: Option<&str>,
36 node_id: Option<&str>,
37 provider_id: Option<&str>,
38 session_id: Option<&str>,
39) -> TenantCtx {
40 let env_id = EnvId::from_str(env).expect("invalid env id");
41 let tenant_id = TenantId::from_str(tenant).expect("invalid tenant id");
42 let mut ctx = TenantCtx::new(env_id, tenant_id);
43 let provider = provider_id.unwrap_or(PROVIDER_ID);
44 ctx = ctx.with_provider(provider.to_string());
45 if let Some(flow) = flow_id {
46 ctx = ctx.with_flow(flow.to_string());
47 }
48 if let Some(node) = node_id {
49 ctx = ctx.with_node(node.to_string());
50 }
51 if let Some(session) = session_id {
52 ctx = ctx.with_session(session.to_string());
53 }
54 ctx
55}
56
57#[allow(clippy::too_many_arguments)]
65fn flow_tenant_ctx(
66 env: &str,
67 tenant: &str,
68 flow_id: &str,
69 node_id: Option<&str>,
70 provider_id: Option<&str>,
71 session_id: Option<&str>,
72 pack_id: &str,
73 rollout: &RolloutIds,
74) -> TenantCtx {
75 let mut ctx = tenant_context(env, tenant, Some(flow_id), node_id, provider_id, session_id);
76 if !pack_id.is_empty() {
77 ctx.attributes
78 .insert(attr_keys::PACK_ID.to_string(), pack_id.to_string());
79 }
80 stamp_rollout_ids(&mut ctx, rollout);
81 ctx
82}
83
84#[allow(clippy::too_many_arguments)]
95pub fn set_flow_context(
96 span: &Span,
97 env: &str,
98 tenant: &str,
99 flow_id: &str,
100 node_id: Option<&str>,
101 provider_id: Option<&str>,
102 session_id: Option<&str>,
103 pack_id: &str,
104 rollout: &RolloutIds,
105) {
106 let ctx = flow_tenant_ctx(
107 env,
108 tenant,
109 flow_id,
110 node_id,
111 provider_id,
112 session_id,
113 pack_id,
114 rollout,
115 );
116 set_current_tenant_ctx(&ctx);
117 #[cfg(feature = "telemetry")]
118 greentic_telemetry::annotate_span(
119 span,
120 &greentic_types::telemetry::tenant_ctx_to_telemetry(&ctx),
121 );
122 #[cfg(not(feature = "telemetry"))]
123 let _ = span;
124}
125
126#[derive(Debug, Clone, Default, PartialEq, Eq)]
131pub struct RolloutIds {
132 pub customer_id: Option<String>,
133 pub deployment_id: Option<String>,
134 pub bundle_id: Option<String>,
135 pub revision_id: Option<String>,
136}
137
138impl RolloutIds {
139 pub fn is_empty(&self) -> bool {
142 self.customer_id.is_none()
143 && self.deployment_id.is_none()
144 && self.bundle_id.is_none()
145 && self.revision_id.is_none()
146 }
147}
148
149pub fn stamp_rollout_ids(ctx: &mut TenantCtx, ids: &RolloutIds) {
158 set_or_clear(ctx, attr_keys::CUSTOMER_ID, ids.customer_id.as_deref());
159 set_or_clear(ctx, attr_keys::DEPLOYMENT_ID, ids.deployment_id.as_deref());
160 set_or_clear(ctx, attr_keys::BUNDLE_ID, ids.bundle_id.as_deref());
161 set_or_clear(ctx, attr_keys::REVISION_ID, ids.revision_id.as_deref());
162}
163
164fn set_or_clear(ctx: &mut TenantCtx, key: &str, value: Option<&str>) {
165 match value {
166 Some(v) => {
167 ctx.attributes.insert(key.to_string(), v.to_string());
168 }
169 None => {
170 ctx.attributes.remove(key);
171 }
172 }
173}
174
175pub fn backoff_delay_ms(base: u64, attempt: u32) -> u64 {
176 let multiplier = 1_u64 << attempt.min(10);
177 let exp = base.saturating_mul(multiplier);
178 let mut rng = rng();
179 let jitter = rng.random_range(0..=exp.min(1000));
180 exp + jitter
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 fn ctx() -> TenantCtx {
188 tenant_context("prod-eu", "acme", None, None, None, None)
189 }
190
191 #[test]
192 fn stamp_sets_present_ids_under_canonical_keys() {
193 let mut c = ctx();
194 let ids = RolloutIds {
195 customer_id: Some("cust-acme".into()),
196 deployment_id: Some("01JTKS".into()),
197 bundle_id: Some("customer.support".into()),
198 revision_id: Some("01JTKR".into()),
199 };
200 stamp_rollout_ids(&mut c, &ids);
201 assert_eq!(
202 c.attributes.get(attr_keys::CUSTOMER_ID).map(String::as_str),
203 Some("cust-acme")
204 );
205 assert_eq!(
206 c.attributes
207 .get(attr_keys::DEPLOYMENT_ID)
208 .map(String::as_str),
209 Some("01JTKS")
210 );
211 assert_eq!(
212 c.attributes.get(attr_keys::BUNDLE_ID).map(String::as_str),
213 Some("customer.support")
214 );
215 assert_eq!(
216 c.attributes.get(attr_keys::REVISION_ID).map(String::as_str),
217 Some("01JTKR")
218 );
219 }
220
221 #[test]
222 fn stamp_empty_is_noop() {
223 let mut c = ctx();
224 let before = c.attributes.len();
225 stamp_rollout_ids(&mut c, &RolloutIds::default());
226 assert_eq!(c.attributes.len(), before);
227 assert!(RolloutIds::default().is_empty());
228 }
229
230 #[test]
231 fn stamp_only_sets_present_subset() {
232 let mut c = ctx();
233 stamp_rollout_ids(
234 &mut c,
235 &RolloutIds {
236 deployment_id: Some("01JTKS".into()),
237 ..Default::default()
238 },
239 );
240 assert!(c.attributes.contains_key(attr_keys::DEPLOYMENT_ID));
241 assert!(!c.attributes.contains_key(attr_keys::CUSTOMER_ID));
242 }
243
244 #[test]
245 fn flow_ctx_stamps_pack_id_live() {
246 let ctx = flow_tenant_ctx(
247 "prod-eu",
248 "acme",
249 "support",
250 None,
251 None,
252 None,
253 "customer.support@1.2.0",
254 &RolloutIds::default(),
255 );
256 assert_eq!(
257 ctx.attributes.get(attr_keys::PACK_ID).map(String::as_str),
258 Some("customer.support@1.2.0")
259 );
260 assert!(!ctx.attributes.contains_key(attr_keys::REVISION_ID));
262 }
263
264 #[test]
265 fn flow_ctx_stamps_pack_id_and_rollout_ids() {
266 let ctx = flow_tenant_ctx(
267 "prod-eu",
268 "acme",
269 "support",
270 None,
271 None,
272 None,
273 "customer.support@1.2.0",
274 &RolloutIds {
275 customer_id: Some("cust-acme".into()),
276 deployment_id: Some("01JTKS".into()),
277 bundle_id: Some("customer.support".into()),
278 revision_id: Some("01JTKR".into()),
279 },
280 );
281 assert_eq!(
282 ctx.attributes.get(attr_keys::PACK_ID).map(String::as_str),
283 Some("customer.support@1.2.0")
284 );
285 assert_eq!(
286 ctx.attributes
287 .get(attr_keys::REVISION_ID)
288 .map(String::as_str),
289 Some("01JTKR")
290 );
291 assert_eq!(
292 ctx.attributes
293 .get(attr_keys::DEPLOYMENT_ID)
294 .map(String::as_str),
295 Some("01JTKS")
296 );
297 }
298
299 #[test]
300 fn flow_ctx_skips_empty_pack_id() {
301 let ctx = flow_tenant_ctx(
302 "prod-eu",
303 "acme",
304 "support",
305 None,
306 None,
307 None,
308 "",
309 &RolloutIds::default(),
310 );
311 assert!(!ctx.attributes.contains_key(attr_keys::PACK_ID));
312 }
313
314 #[test]
315 fn stamp_clears_stale_ids_on_restamp() {
316 let mut c = ctx();
317 stamp_rollout_ids(
318 &mut c,
319 &RolloutIds {
320 customer_id: Some("cust-acme".into()),
321 deployment_id: Some("01JTKS".into()),
322 bundle_id: Some("customer.support".into()),
323 revision_id: Some("01JTKR".into()),
324 },
325 );
326 stamp_rollout_ids(
329 &mut c,
330 &RolloutIds {
331 revision_id: Some("01JTKZ".into()),
332 ..Default::default()
333 },
334 );
335 assert_eq!(
336 c.attributes.get(attr_keys::REVISION_ID).map(String::as_str),
337 Some("01JTKZ")
338 );
339 assert!(!c.attributes.contains_key(attr_keys::CUSTOMER_ID));
340 assert!(!c.attributes.contains_key(attr_keys::DEPLOYMENT_ID));
341 assert!(!c.attributes.contains_key(attr_keys::BUNDLE_ID));
342 }
343}
344
345#[cfg(all(test, feature = "telemetry"))]
351mod export_tests {
352 use super::*;
353 use opentelemetry::trace::TracerProvider as _;
354 use opentelemetry_sdk::error::OTelSdkResult;
355 use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, SpanExporter};
356 use std::sync::{Arc, Mutex};
357 use tracing::subscriber;
358 use tracing_subscriber::prelude::*;
359
360 #[derive(Clone, Debug)]
361 struct TestExporter {
362 spans: Arc<Mutex<Vec<SpanData>>>,
363 }
364
365 impl SpanExporter for TestExporter {
366 fn export(
367 &self,
368 batch: Vec<SpanData>,
369 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
370 let spans = self.spans.clone();
371 async move {
372 spans
373 .lock()
374 .unwrap_or_else(|e| e.into_inner())
375 .extend(batch);
376 Ok(())
377 }
378 }
379 }
380
381 fn attr_value(span: &SpanData, key: &str) -> Option<String> {
382 span.attributes
383 .iter()
384 .find(|kv| kv.key.as_str() == key)
385 .map(|kv| kv.value.to_string())
386 }
387
388 #[test]
389 fn set_flow_context_exports_pack_id_and_rollout_ids() {
390 let exported = Arc::new(Mutex::new(Vec::new()));
391 let provider = SdkTracerProvider::builder()
392 .with_simple_exporter(TestExporter {
393 spans: exported.clone(),
394 })
395 .build();
396 let tracer = provider.tracer("c5.4-flow-export");
397 let subscriber =
398 tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
399 let _guard = subscriber::set_default(subscriber);
400
401 let span = tracing::info_span!("flow.execute");
404 set_flow_context(
405 &span,
406 "prod-eu",
407 "acme",
408 "support",
409 None,
410 None,
411 Some("sess-1"),
412 "customer.support@1.2.0",
413 &RolloutIds {
414 customer_id: Some("cust-acme".into()),
415 deployment_id: Some("01JTKS".into()),
416 bundle_id: Some("customer.support".into()),
417 revision_id: Some("01JTKR".into()),
418 },
419 );
420 {
421 let _enter = span.enter();
422 }
423 drop(span);
424
425 let _ = provider.force_flush();
426 let _ = provider.shutdown();
427
428 let spans = exported.lock().unwrap_or_else(|e| e.into_inner());
429 let span = spans
432 .iter()
433 .find(|s| s.name == "flow.execute")
434 .expect("flow.execute span exported");
435 assert_eq!(
436 attr_value(span, attr_keys::PACK_ID).as_deref(),
437 Some("customer.support@1.2.0"),
438 "pack_id must reach the exported span"
439 );
440 assert_eq!(
441 attr_value(span, attr_keys::REVISION_ID).as_deref(),
442 Some("01JTKR")
443 );
444 assert_eq!(
445 attr_value(span, attr_keys::DEPLOYMENT_ID).as_deref(),
446 Some("01JTKS")
447 );
448 assert_eq!(
449 attr_value(span, attr_keys::BUNDLE_ID).as_deref(),
450 Some("customer.support")
451 );
452 assert_eq!(
453 attr_value(span, attr_keys::CUSTOMER_ID).as_deref(),
454 Some("cust-acme")
455 );
456 }
457}