use langfuse_core::types::PropagateAttributes;
use opentelemetry::Context;
use opentelemetry::baggage::BaggageExt;
use opentelemetry::trace::TraceContextExt;
use super::attributes;
use super::span::LangfuseSpan;
const BAGGAGE_TRACE_ID: &str = "langfuse.trace_id";
const BAGGAGE_USER_ID: &str = "langfuse.user_id";
const BAGGAGE_SESSION_ID: &str = "langfuse.session_id";
const BAGGAGE_TAGS: &str = "langfuse.tags";
pub fn get_current_trace_id() -> Option<String> {
let cx = Context::current();
let span = cx.span();
let sc = span.span_context();
if sc.is_valid() {
Some(sc.trace_id().to_string())
} else {
cx.baggage()
.get(BAGGAGE_TRACE_ID)
.map(|value| value.as_str().to_owned())
}
}
pub fn get_current_observation_id() -> Option<String> {
let cx = Context::current();
let span = cx.span();
let sc = span.span_context();
if sc.is_valid() {
Some(sc.span_id().to_string())
} else {
None
}
}
pub fn propagate_attributes(attrs: &PropagateAttributes, f: impl FnOnce()) {
let _guard = PropagatedAttrsGuard::install(attrs);
if attrs.as_baggage {
propagate_as_baggage(attrs, f);
} else {
f();
}
}
pub fn propagate_as_baggage(attrs: &PropagateAttributes, f: impl FnOnce()) {
let cx = Context::current();
let mut baggage: opentelemetry::baggage::Baggage = cx
.baggage()
.iter()
.map(|(key, (value, metadata))| (key.clone(), (value.clone(), metadata.clone())))
.collect();
let span_ref = cx.span();
let sc = span_ref.span_context();
if sc.is_valid() {
let _ = baggage.insert(BAGGAGE_TRACE_ID, sc.trace_id().to_string());
}
if let Some(ref user_id) = attrs.user_id {
let _ = baggage.insert(BAGGAGE_USER_ID, user_id.clone());
}
if let Some(ref session_id) = attrs.session_id {
let _ = baggage.insert(BAGGAGE_SESSION_ID, session_id.clone());
}
if let Some(ref tags) = attrs.tags
&& let Ok(tags_json) = serde_json::to_string(tags)
{
let _ = baggage.insert(BAGGAGE_TAGS, tags_json);
}
let cx_with_baggage = cx.with_baggage(baggage);
let _guard = cx_with_baggage.attach();
f();
}
#[must_use]
pub fn read_propagated_attributes_from_baggage() -> PropagateAttributes {
read_propagated_attributes_from_context(&Context::current())
}
pub(crate) fn apply_propagated_attributes(span: &LangfuseSpan) {
let explicit_attrs = PROPAGATED_ATTRS.with(|cell| cell.borrow().clone());
if let Some(attrs) = resolve_propagated_attributes(explicit_attrs) {
if let Some(ref user_id) = attrs.user_id {
span.set_string_attribute(attributes::LANGFUSE_USER_ID, user_id);
}
if let Some(ref session_id) = attrs.session_id {
span.set_string_attribute(attributes::LANGFUSE_SESSION_ID, session_id);
}
if let Some(ref metadata) = attrs.metadata {
span.set_json_attribute(attributes::LANGFUSE_METADATA, metadata);
}
if let Some(ref version) = attrs.version {
span.set_string_attribute(attributes::LANGFUSE_VERSION, version);
}
if let Some(ref tags) = attrs.tags {
let tag_refs: Vec<&str> = tags.iter().map(String::as_str).collect();
span.set_json_attribute(attributes::LANGFUSE_TAGS, &tag_refs);
}
if let Some(ref trace_name) = attrs.trace_name {
span.set_string_attribute("langfuse.trace.name", trace_name);
}
}
}
fn resolve_propagated_attributes(
explicit_attrs: Option<PropagateAttributes>,
) -> Option<PropagateAttributes> {
let from_baggage = read_propagated_attributes_from_context(&Context::current());
let mut merged = explicit_attrs.unwrap_or_default();
if merged.user_id.is_none() {
merged.user_id = from_baggage.user_id;
}
if merged.session_id.is_none() {
merged.session_id = from_baggage.session_id;
}
if merged.tags.is_none() {
merged.tags = from_baggage.tags;
}
if has_propagated_values(&merged) {
Some(merged)
} else {
None
}
}
fn read_propagated_attributes_from_context(cx: &Context) -> PropagateAttributes {
let baggage = cx.baggage();
let user_id = baggage
.get(BAGGAGE_USER_ID)
.map(|value| value.as_str().to_owned());
let session_id = baggage
.get(BAGGAGE_SESSION_ID)
.map(|value| value.as_str().to_owned());
let tags = baggage
.get(BAGGAGE_TAGS)
.and_then(|value| serde_json::from_str::<Vec<String>>(value.as_str()).ok())
.filter(|tags| !tags.is_empty());
PropagateAttributes {
user_id,
session_id,
tags,
..Default::default()
}
}
struct PropagatedAttrsGuard {
prev: Option<PropagateAttributes>,
}
impl PropagatedAttrsGuard {
fn install(attrs: &PropagateAttributes) -> Self {
let prev = PROPAGATED_ATTRS.with(|cell| cell.replace(Some(attrs.clone())));
Self { prev }
}
}
impl Drop for PropagatedAttrsGuard {
fn drop(&mut self) {
PROPAGATED_ATTRS.with(|cell| {
let _ = cell.replace(self.prev.clone());
});
}
}
const fn has_propagated_values(attrs: &PropagateAttributes) -> bool {
attrs.user_id.is_some()
|| attrs.session_id.is_some()
|| attrs.metadata.is_some()
|| attrs.version.is_some()
|| attrs.tags.is_some()
|| attrs.trace_name.is_some()
}
std::thread_local! {
static PROPAGATED_ATTRS: std::cell::RefCell<Option<PropagateAttributes>> = const { std::cell::RefCell::new(None) };
}