use dashmap::DashMap;
use std::sync::OnceLock;
use langfuse_core::config::LangfuseConfig;
use langfuse_core::error::LangfuseError;
use langfuse_core::types::ObservationType;
use serde::Serialize;
use crate::datasets::manager::DatasetManager;
use crate::langfuse_tracing::exporter::LangfuseTracing;
use crate::langfuse_tracing::generation::LangfuseGeneration;
use crate::langfuse_tracing::span::LangfuseSpan;
use crate::media::manager::MediaManager;
use crate::prompts::manager::PromptManager;
use crate::scoring::manager::ScoreManager;
pub struct Langfuse {
config: LangfuseConfig,
tracing: Option<LangfuseTracing>,
pub prompts: PromptManager,
pub scores: ScoreManager,
pub datasets: DatasetManager,
pub media: MediaManager,
}
impl std::fmt::Debug for Langfuse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Langfuse")
.field("config", &self.config)
.field("tracing", &self.tracing.is_some())
.finish()
}
}
static INSTANCE: OnceLock<Langfuse> = OnceLock::new();
static NAMED_INSTANCES: OnceLock<DashMap<String, Langfuse>> = OnceLock::new();
impl Langfuse {
pub fn new(config: LangfuseConfig) -> Result<Self, LangfuseError> {
let tracing = if config.tracing_enabled {
Some(LangfuseTracing::builder(&config).build()?)
} else {
None
};
Ok(Self {
prompts: PromptManager::new(&config),
scores: ScoreManager::new(&config),
datasets: DatasetManager::new(&config),
media: MediaManager::new(&config),
tracing,
config,
})
}
pub fn from_env() -> Result<Self, LangfuseError> {
let config = LangfuseConfig::from_env()?;
Self::new(config)
}
pub fn init(config: LangfuseConfig) -> Result<&'static Langfuse, LangfuseError> {
let instance = Self::new(config)?;
INSTANCE.set(instance).map_err(|_| {
LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
field: "global".into(),
message: "Langfuse already initialized".into(),
})
})?;
Ok(INSTANCE.get().unwrap())
}
pub fn get() -> &'static Langfuse {
INSTANCE
.get()
.expect("Langfuse not initialized. Call Langfuse::init() first.")
}
pub fn try_get() -> Option<&'static Langfuse> {
INSTANCE.get()
}
pub fn init_named(name: &str, config: LangfuseConfig) -> Result<(), LangfuseError> {
let instance = Self::new(config)?;
let map = NAMED_INSTANCES.get_or_init(DashMap::new);
map.insert(name.to_string(), instance);
Ok(())
}
pub fn get_named(name: &str) -> Option<dashmap::mapref::one::Ref<'static, String, Langfuse>> {
NAMED_INSTANCES.get().and_then(|map| map.get(name))
}
pub fn try_get_named(
name: &str,
) -> Result<dashmap::mapref::one::Ref<'static, String, Langfuse>, LangfuseError> {
Self::get_named(name).ok_or_else(|| {
LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
field: "name".into(),
message: format!("Named instance '{name}' not initialized"),
})
})
}
pub fn config(&self) -> &LangfuseConfig {
&self.config
}
pub async fn auth_check(&self) -> Result<(), LangfuseError> {
let url = format!("{}/projects", self.config.api_base_url());
let resp = reqwest::Client::new()
.get(&url)
.header("Authorization", self.config.basic_auth_header())
.send()
.await
.map_err(LangfuseError::Network)?;
if resp.status() == 401 || resp.status() == 403 {
return Err(LangfuseError::Auth);
}
if !resp.status().is_success() {
let status = resp.status().as_u16();
let message = resp.text().await.unwrap_or_default();
return Err(LangfuseError::Api { status, message });
}
Ok(())
}
pub async fn flush(&self) -> Result<(), LangfuseError> {
self.scores.flush().await?;
if let Some(ref tracing) = self.tracing {
tracing
.shutdown()
.map_err(|e| LangfuseError::Otel(e.to_string()))?;
}
Ok(())
}
pub async fn shutdown(&self) -> Result<(), LangfuseError> {
self.flush().await
}
pub fn start_span(&self, name: &str) -> LangfuseSpan {
LangfuseSpan::start(name)
}
pub fn start_generation(&self, name: &str) -> LangfuseGeneration {
LangfuseGeneration::start(name)
}
pub fn start_span_with_type(&self, name: &str, obs_type: ObservationType) -> LangfuseSpan {
LangfuseSpan::start_with_type(name, obs_type)
}
pub fn create_event(&self, name: &str, input: &impl Serialize) {
let span = LangfuseSpan::start_with_type(name, ObservationType::Event);
span.set_input(input);
span.end();
}
pub fn get_trace_url(&self, trace_id: &str) -> String {
format!(
"{}/trace/{}",
self.config.base_url.trim_end_matches('/'),
trace_id
)
}
pub fn register_tracing(&self) {
if let Some(ref tracing) = self.tracing {
opentelemetry::global::set_tracer_provider(tracing.provider().clone());
}
}
}
impl Drop for Langfuse {
fn drop(&mut self) {
if let Ok(handle) = tokio::runtime::Handle::try_current()
&& handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread
{
tokio::task::block_in_place(|| {
handle.block_on(async {
let _ = self.scores.shutdown().await;
});
});
}
}
}