Skip to main content

langfuse/
client.rs

1//! The top-level Langfuse client that ties together all managers and the
2//! tracing pipeline.
3
4use dashmap::DashMap;
5use std::sync::OnceLock;
6
7use langfuse_core::config::LangfuseConfig;
8use langfuse_core::error::LangfuseError;
9use langfuse_core::types::ObservationType;
10use serde::Serialize;
11
12use crate::datasets::manager::DatasetManager;
13use crate::langfuse_tracing::exporter::LangfuseTracing;
14use crate::langfuse_tracing::generation::LangfuseGeneration;
15use crate::langfuse_tracing::span::LangfuseSpan;
16use crate::media::manager::MediaManager;
17use crate::prompts::manager::PromptManager;
18use crate::scoring::manager::ScoreManager;
19
20/// The main Langfuse client. Holds all managers and the tracing pipeline.
21pub struct Langfuse {
22    config: LangfuseConfig,
23    tracing: Option<LangfuseTracing>,
24    /// Prompt management: fetching, caching, and compilation.
25    pub prompts: PromptManager,
26    /// Score creation and batched submission.
27    pub scores: ScoreManager,
28    /// Dataset CRUD operations.
29    pub datasets: DatasetManager,
30    /// Media upload and fetch.
31    pub media: MediaManager,
32}
33
34impl std::fmt::Debug for Langfuse {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("Langfuse")
37            .field("config", &self.config)
38            .field("tracing", &self.tracing.is_some())
39            .finish()
40    }
41}
42
43/// Global singleton.
44static INSTANCE: OnceLock<Langfuse> = OnceLock::new();
45
46/// Named instances for multi-environment or multi-project usage.
47static NAMED_INSTANCES: OnceLock<DashMap<String, Langfuse>> = OnceLock::new();
48
49impl Langfuse {
50    /// Create a new Langfuse client from config.
51    pub fn new(config: LangfuseConfig) -> Result<Self, LangfuseError> {
52        let tracing = if config.tracing_enabled {
53            Some(LangfuseTracing::builder(&config).build()?)
54        } else {
55            None
56        };
57
58        Ok(Self {
59            prompts: PromptManager::new(&config),
60            scores: ScoreManager::new(&config),
61            datasets: DatasetManager::new(&config),
62            media: MediaManager::new(&config),
63            tracing,
64            config,
65        })
66    }
67
68    /// Create from environment variables.
69    pub fn from_env() -> Result<Self, LangfuseError> {
70        let config = LangfuseConfig::from_env()?;
71        Self::new(config)
72    }
73
74    /// Initialize the global singleton.
75    pub fn init(config: LangfuseConfig) -> Result<&'static Langfuse, LangfuseError> {
76        let instance = Self::new(config)?;
77        INSTANCE.set(instance).map_err(|_| {
78            LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
79                field: "global".into(),
80                message: "Langfuse already initialized".into(),
81            })
82        })?;
83        Ok(INSTANCE.get().unwrap())
84    }
85
86    /// Get the global singleton (panics if not initialized).
87    pub fn get() -> &'static Langfuse {
88        INSTANCE
89            .get()
90            .expect("Langfuse not initialized. Call Langfuse::init() first.")
91    }
92
93    /// Try to get the global singleton.
94    pub fn try_get() -> Option<&'static Langfuse> {
95        INSTANCE.get()
96    }
97
98    /// Initialize a named instance.
99    ///
100    /// Named instances are independent of the global singleton and allow
101    /// multiple Langfuse clients (e.g. for different projects or environments)
102    /// to coexist.
103    pub fn init_named(name: &str, config: LangfuseConfig) -> Result<(), LangfuseError> {
104        let instance = Self::new(config)?;
105        let map = NAMED_INSTANCES.get_or_init(DashMap::new);
106        map.insert(name.to_string(), instance);
107        Ok(())
108    }
109
110    /// Get a named instance by name.
111    ///
112    /// Returns `None` if no instance with the given name has been initialized.
113    pub fn get_named(name: &str) -> Option<dashmap::mapref::one::Ref<'static, String, Langfuse>> {
114        NAMED_INSTANCES.get().and_then(|map| map.get(name))
115    }
116
117    /// Try to get a named instance, returning an error if not found.
118    pub fn try_get_named(
119        name: &str,
120    ) -> Result<dashmap::mapref::one::Ref<'static, String, Langfuse>, LangfuseError> {
121        Self::get_named(name).ok_or_else(|| {
122            LangfuseError::Config(langfuse_core::error::ConfigError::InvalidValue {
123                field: "name".into(),
124                message: format!("Named instance '{name}' not initialized"),
125            })
126        })
127    }
128
129    /// Get the config.
130    pub fn config(&self) -> &LangfuseConfig {
131        &self.config
132    }
133
134    /// Check if authentication is valid by making a test API call.
135    pub async fn auth_check(&self) -> Result<(), LangfuseError> {
136        let url = format!("{}/projects", self.config.api_base_url());
137        let resp = reqwest::Client::new()
138            .get(&url)
139            .header("Authorization", self.config.basic_auth_header())
140            .send()
141            .await
142            .map_err(LangfuseError::Network)?;
143
144        if resp.status() == 401 || resp.status() == 403 {
145            return Err(LangfuseError::Auth);
146        }
147        if !resp.status().is_success() {
148            let status = resp.status().as_u16();
149            let message = resp.text().await.unwrap_or_default();
150            return Err(LangfuseError::Api { status, message });
151        }
152        Ok(())
153    }
154
155    /// Flush all pending data (scores, traces).
156    pub async fn flush(&self) -> Result<(), LangfuseError> {
157        self.scores.flush().await?;
158        if let Some(ref tracing) = self.tracing {
159            tracing
160                .shutdown()
161                .map_err(|e| LangfuseError::Otel(e.to_string()))?;
162        }
163        Ok(())
164    }
165
166    /// Shut down the client, flushing all pending data.
167    pub async fn shutdown(&self) -> Result<(), LangfuseError> {
168        self.flush().await
169    }
170
171    // ------------------------------------------------------------------
172    // Convenience tracing methods
173    // ------------------------------------------------------------------
174
175    /// Start a new span.
176    pub fn start_span(&self, name: &str) -> LangfuseSpan {
177        LangfuseSpan::start(name)
178    }
179
180    /// Start a new generation.
181    pub fn start_generation(&self, name: &str) -> LangfuseGeneration {
182        LangfuseGeneration::start(name)
183    }
184
185    /// Start a new span with a specific observation type.
186    pub fn start_span_with_type(&self, name: &str, obs_type: ObservationType) -> LangfuseSpan {
187        LangfuseSpan::start_with_type(name, obs_type)
188    }
189
190    /// Create a standalone root-level event observation.
191    ///
192    /// Events are zero-duration observations that carry input data.
193    /// The event span is created and immediately ended.
194    pub fn create_event(&self, name: &str, input: &impl Serialize) {
195        let span = LangfuseSpan::start_with_type(name, ObservationType::Event);
196        span.set_input(input);
197        span.end();
198    }
199
200    /// Generate the Langfuse UI URL for a trace.
201    pub fn get_trace_url(&self, trace_id: &str) -> String {
202        format!(
203            "{}/trace/{}",
204            self.config.base_url.trim_end_matches('/'),
205            trace_id
206        )
207    }
208
209    /// Register the internal tracer provider as the OpenTelemetry global provider.
210    ///
211    /// This **must** be called once after [`Langfuse::new`] for spans created via
212    /// [`LangfuseSpan::start`] (which uses `opentelemetry::global::tracer("langfuse")`)
213    /// to be exported through the Langfuse OTLP pipeline.
214    ///
215    /// Calling this more than once replaces the previous global provider.
216    pub fn register_tracing(&self) {
217        if let Some(ref tracing) = self.tracing {
218            opentelemetry::global::set_tracer_provider(tracing.provider().clone());
219        }
220    }
221}
222
223impl Drop for Langfuse {
224    fn drop(&mut self) {
225        // Attempt async flush using block_in_place if in multi-thread runtime.
226        // Skip gracefully if in current-thread runtime or no runtime.
227        if let Ok(handle) = tokio::runtime::Handle::try_current()
228            && handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread
229        {
230            tokio::task::block_in_place(|| {
231                handle.block_on(async {
232                    let _ = self.scores.shutdown().await;
233                    // Tracing provider shutdown handled separately
234                });
235            });
236        }
237    }
238}