1use dashmap::DashMap;
5use std::sync::OnceLock;
6
7use langfuse_core::config::LangfuseConfig;
8use langfuse_core::error::LangfuseError;
9use langfuse_core::types::ObservationType;
10use serde::Serialize;
11
12use crate::datasets::manager::DatasetManager;
13use crate::langfuse_tracing::exporter::LangfuseTracing;
14use crate::langfuse_tracing::generation::LangfuseGeneration;
15use crate::langfuse_tracing::span::LangfuseSpan;
16use crate::media::manager::MediaManager;
17use crate::prompts::manager::PromptManager;
18use crate::scoring::manager::ScoreManager;
19
20pub struct Langfuse {
22 config: LangfuseConfig,
23 tracing: Option<LangfuseTracing>,
24 pub prompts: PromptManager,
26 pub scores: ScoreManager,
28 pub datasets: DatasetManager,
30 pub media: MediaManager,
32}
33
34impl std::fmt::Debug for Langfuse {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("Langfuse")
37 .field("config", &self.config)
38 .field("tracing", &self.tracing.is_some())
39 .finish()
40 }
41}
42
43static INSTANCE: OnceLock<Langfuse> = OnceLock::new();
45
46static NAMED_INSTANCES: OnceLock<DashMap<String, Langfuse>> = OnceLock::new();
48
49impl Langfuse {
50 pub fn new(config: LangfuseConfig) -> Result<Self, LangfuseError> {
52 let tracing = if config.tracing_enabled {
53 Some(LangfuseTracing::builder(&config).build()?)
54 } else {
55 None
56 };
57
58 Ok(Self {
59 prompts: PromptManager::new(&config),
60 scores: ScoreManager::new(&config),
61 datasets: DatasetManager::new(&config),
62 media: MediaManager::new(&config),
63 tracing,
64 config,
65 })
66 }
67
68 pub fn from_env() -> Result<Self, LangfuseError> {
70 let config = LangfuseConfig::from_env()?;
71 Self::new(config)
72 }
73
74 pub fn init(config: LangfuseConfig) -> Result<&'static Langfuse, LangfuseError> {
76 let instance = Self::new(config)?;
77 INSTANCE.set(instance).map_err(|_| {
78 LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
79 field: "global".into(),
80 message: "Langfuse already initialized".into(),
81 })
82 })?;
83 Ok(INSTANCE.get().unwrap())
84 }
85
86 pub fn get() -> &'static Langfuse {
88 INSTANCE
89 .get()
90 .expect("Langfuse not initialized. Call Langfuse::init() first.")
91 }
92
93 pub fn try_get() -> Option<&'static Langfuse> {
95 INSTANCE.get()
96 }
97
98 pub fn init_named(name: &str, config: LangfuseConfig) -> Result<(), LangfuseError> {
104 let instance = Self::new(config)?;
105 let map = NAMED_INSTANCES.get_or_init(DashMap::new);
106 map.insert(name.to_string(), instance);
107 Ok(())
108 }
109
110 pub fn get_named(name: &str) -> Option<dashmap::mapref::one::Ref<'static, String, Langfuse>> {
114 NAMED_INSTANCES.get().and_then(|map| map.get(name))
115 }
116
117 pub fn try_get_named(
119 name: &str,
120 ) -> Result<dashmap::mapref::one::Ref<'static, String, Langfuse>, LangfuseError> {
121 Self::get_named(name).ok_or_else(|| {
122 LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
123 field: "name".into(),
124 message: format!("Named instance '{name}' not initialized"),
125 })
126 })
127 }
128
129 pub fn config(&self) -> &LangfuseConfig {
131 &self.config
132 }
133
134 pub async fn auth_check(&self) -> Result<(), LangfuseError> {
136 let url = format!("{}/projects", self.config.api_base_url());
137 let resp = reqwest::Client::new()
138 .get(&url)
139 .header("Authorization", self.config.basic_auth_header())
140 .send()
141 .await
142 .map_err(LangfuseError::Network)?;
143
144 if resp.status() == 401 || resp.status() == 403 {
145 return Err(LangfuseError::Auth);
146 }
147 if !resp.status().is_success() {
148 let status = resp.status().as_u16();
149 let message = resp.text().await.unwrap_or_default();
150 return Err(LangfuseError::Api { status, message });
151 }
152 Ok(())
153 }
154
155 pub async fn flush(&self) -> Result<(), LangfuseError> {
157 self.scores.flush().await?;
158 if let Some(ref tracing) = self.tracing {
159 tracing
160 .shutdown()
161 .map_err(|e| LangfuseError::Otel(e.to_string()))?;
162 }
163 Ok(())
164 }
165
166 pub async fn shutdown(&self) -> Result<(), LangfuseError> {
168 self.flush().await
169 }
170
171 pub fn start_span(&self, name: &str) -> LangfuseSpan {
177 LangfuseSpan::start(name)
178 }
179
180 pub fn start_generation(&self, name: &str) -> LangfuseGeneration {
182 LangfuseGeneration::start(name)
183 }
184
185 pub fn start_span_with_type(&self, name: &str, obs_type: ObservationType) -> LangfuseSpan {
187 LangfuseSpan::start_with_type(name, obs_type)
188 }
189
190 pub fn create_event(&self, name: &str, input: &impl Serialize) {
195 let span = LangfuseSpan::start_with_type(name, ObservationType::Event);
196 span.set_input(input);
197 span.end();
198 }
199
200 pub fn get_trace_url(&self, trace_id: &str) -> String {
202 format!(
203 "{}/trace/{}",
204 self.config.base_url.trim_end_matches('/'),
205 trace_id
206 )
207 }
208
209 pub fn register_tracing(&self) {
217 if let Some(ref tracing) = self.tracing {
218 opentelemetry::global::set_tracer_provider(tracing.provider().clone());
219 }
220 }
221}
222
223impl Drop for Langfuse {
224 fn drop(&mut self) {
225 if let Ok(handle) = tokio::runtime::Handle::try_current()
228 && handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread
229 {
230 tokio::task::block_in_place(|| {
231 handle.block_on(async {
232 let _ = self.scores.shutdown().await;
233 });
235 });
236 }
237 }
238}