use opentelemetry::baggage::{Baggage, BaggageExt, KeyValueMetadata};
use opentelemetry::global::BoxedSpan;
use opentelemetry::trace::Span;
use opentelemetry::{Context, KeyValue};
use super::attrs;
pub const BAGGAGE_USER_ID: &str = "user.id";
pub const BAGGAGE_SESSION_ID: &str = "session.id";
pub const BAGGAGE_LANGFUSE_USER_ID: &str = "langfuse.user.id";
pub const BAGGAGE_LANGFUSE_SESSION_ID: &str = "langfuse.session.id";
pub const BAGGAGE_DEPLOYMENT_ENVIRONMENT: &str = "deployment.environment";
const PROPAGATED_KEYS: &[&str] = &[
BAGGAGE_USER_ID,
BAGGAGE_SESSION_ID,
BAGGAGE_LANGFUSE_USER_ID,
BAGGAGE_LANGFUSE_SESSION_ID,
BAGGAGE_DEPLOYMENT_ENVIRONMENT,
];
#[must_use]
pub fn with_user_id(cx: &Context, user_id: impl Into<String>) -> Context {
with_attribute(cx, BAGGAGE_USER_ID, user_id.into())
}
#[must_use]
pub fn with_session_id(cx: &Context, session_id: impl Into<String>) -> Context {
with_attribute(cx, BAGGAGE_SESSION_ID, session_id.into())
}
#[must_use]
pub fn with_attributes(cx: &Context, kvs: impl IntoIterator<Item = KeyValue>) -> Context {
let baggage: Baggage = cx
.baggage()
.iter()
.map(|(key, (value, metadata))| {
KeyValueMetadata::new(key.clone(), value.clone(), metadata.clone())
})
.chain(
kvs.into_iter()
.map(|kv| KeyValueMetadata::new(kv.key, kv.value, "")),
)
.collect();
cx.with_baggage(baggage)
}
fn with_attribute(cx: &Context, key: &'static str, value: String) -> Context {
with_attributes(cx, std::iter::once(KeyValue::new(key, value)))
}
pub fn copy_baggage_to_active_span(span: &mut BoxedSpan) {
if !span.is_recording() {
return;
}
let cx = Context::current();
let baggage = cx.baggage();
for &key in PROPAGATED_KEYS {
if let Some(value) = baggage.get(key) {
let value_string = value.as_str().to_string();
if key == BAGGAGE_SESSION_ID {
span.set_attribute(KeyValue::new(
attrs::GEN_AI_CONVERSATION_ID,
value_string.clone(),
));
}
span.set_attribute(KeyValue::new(key, value_string));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn with_user_id_preserves_existing_baggage() {
let cx = Context::current_with_baggage([KeyValue::new("trace.tag", "v1")]);
let cx = with_user_id(&cx, "user-42");
assert_eq!(
cx.baggage().get("trace.tag").map(ToString::to_string),
Some("v1".to_string())
);
assert_eq!(
cx.baggage().get(BAGGAGE_USER_ID).map(ToString::to_string),
Some("user-42".to_string())
);
}
#[test]
fn with_session_id_overwrites_previous_value() {
let cx = Context::current_with_baggage([KeyValue::new(BAGGAGE_SESSION_ID, "old")]);
let cx = with_session_id(&cx, "new");
assert_eq!(
cx.baggage()
.get(BAGGAGE_SESSION_ID)
.map(ToString::to_string),
Some("new".to_string())
);
}
#[test]
fn with_attributes_merges_multiple_keys() {
let cx = Context::current_with_baggage([KeyValue::new("trace.tag", "v1")]);
let cx = with_attributes(
&cx,
[
KeyValue::new(BAGGAGE_USER_ID, "alice"),
KeyValue::new(BAGGAGE_LANGFUSE_USER_ID, "alice"),
],
);
assert_eq!(
cx.baggage().get("trace.tag").map(ToString::to_string),
Some("v1".to_string())
);
assert_eq!(
cx.baggage().get(BAGGAGE_USER_ID).map(ToString::to_string),
Some("alice".to_string())
);
assert_eq!(
cx.baggage()
.get(BAGGAGE_LANGFUSE_USER_ID)
.map(ToString::to_string),
Some("alice".to_string())
);
}
#[test]
fn propagated_keys_constant_includes_all_advertised_constants() {
assert!(PROPAGATED_KEYS.contains(&BAGGAGE_USER_ID));
assert!(PROPAGATED_KEYS.contains(&BAGGAGE_SESSION_ID));
assert!(PROPAGATED_KEYS.contains(&BAGGAGE_LANGFUSE_USER_ID));
assert!(PROPAGATED_KEYS.contains(&BAGGAGE_LANGFUSE_SESSION_ID));
assert!(PROPAGATED_KEYS.contains(&BAGGAGE_DEPLOYMENT_ENVIRONMENT));
}
}