1use crate::error::ConfigError;
4use base64::Engine as _;
5use base64::engine::general_purpose::STANDARD as BASE64;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10pub type MaskFn = Arc<dyn Fn(serde_json::Value) -> serde_json::Value + Send + Sync>;
12
13pub type SpanFilterFn = Arc<dyn Fn(&opentelemetry_sdk::trace::SpanData) -> bool + Send + Sync>;
15
16pub struct LangfuseConfig {
21 pub public_key: String,
23 pub secret_key: String,
25 pub base_url: String,
27 pub timeout: Duration,
29 pub flush_at: usize,
31 pub flush_interval: Duration,
33 pub sample_rate: f64,
35 pub environment: Option<String>,
37 pub release: Option<String>,
39 pub debug: bool,
41 pub tracing_enabled: bool,
43 pub mask: Option<MaskFn>,
45 pub additional_headers: Option<HashMap<String, String>>,
47 pub max_retries: usize,
49 pub media_upload_thread_count: usize,
51 pub io_capture_enabled: bool,
53 pub should_export_span: Option<SpanFilterFn>,
55}
56
57impl std::fmt::Debug for LangfuseConfig {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("LangfuseConfig")
60 .field("public_key", &self.public_key)
61 .field("secret_key", &"[REDACTED]")
62 .field("base_url", &self.base_url)
63 .field("timeout", &self.timeout)
64 .field("flush_at", &self.flush_at)
65 .field("flush_interval", &self.flush_interval)
66 .field("sample_rate", &self.sample_rate)
67 .field("environment", &self.environment)
68 .field("release", &self.release)
69 .field("debug", &self.debug)
70 .field("tracing_enabled", &self.tracing_enabled)
71 .field(
72 "mask",
73 if self.mask.is_some() {
74 &"Some(<mask fn>)"
75 } else {
76 &"None"
77 },
78 )
79 .field("additional_headers", &self.additional_headers)
80 .field("max_retries", &self.max_retries)
81 .field("media_upload_thread_count", &self.media_upload_thread_count)
82 .field("io_capture_enabled", &self.io_capture_enabled)
83 .field(
84 "should_export_span",
85 if self.should_export_span.is_some() {
86 &"Some(<filter fn>)"
87 } else {
88 &"None"
89 },
90 )
91 .finish()
92 }
93}
94
95impl Clone for LangfuseConfig {
96 fn clone(&self) -> Self {
97 Self {
98 public_key: self.public_key.clone(),
99 secret_key: self.secret_key.clone(),
100 base_url: self.base_url.clone(),
101 timeout: self.timeout,
102 flush_at: self.flush_at,
103 flush_interval: self.flush_interval,
104 sample_rate: self.sample_rate,
105 environment: self.environment.clone(),
106 release: self.release.clone(),
107 debug: self.debug,
108 tracing_enabled: self.tracing_enabled,
109 mask: self.mask.clone(),
110 additional_headers: self.additional_headers.clone(),
111 max_retries: self.max_retries,
112 media_upload_thread_count: self.media_upload_thread_count,
113 io_capture_enabled: self.io_capture_enabled,
114 should_export_span: self.should_export_span.clone(),
115 }
116 }
117}
118
119impl LangfuseConfig {
120 #[must_use]
122 pub fn builder() -> LangfuseConfigBuilder {
123 LangfuseConfigBuilder::default()
124 }
125
126 pub fn from_env() -> std::result::Result<Self, ConfigError> {
136 let public_key =
137 std::env::var("LANGFUSE_PUBLIC_KEY").map_err(|_| ConfigError::MissingField {
138 field: "LANGFUSE_PUBLIC_KEY".into(),
139 })?;
140 let secret_key =
141 std::env::var("LANGFUSE_SECRET_KEY").map_err(|_| ConfigError::MissingField {
142 field: "LANGFUSE_SECRET_KEY".into(),
143 })?;
144
145 let base_url = std::env::var("LANGFUSE_BASE_URL")
146 .unwrap_or_else(|_| "https://cloud.langfuse.com".into());
147 let timeout = std::env::var("LANGFUSE_TIMEOUT")
148 .ok()
149 .and_then(|v| v.parse::<u64>().ok())
150 .map(Duration::from_secs)
151 .unwrap_or(Duration::from_secs(5));
152 let flush_at = std::env::var("LANGFUSE_FLUSH_AT")
153 .ok()
154 .and_then(|v| v.parse().ok())
155 .unwrap_or(512);
156 let flush_interval = std::env::var("LANGFUSE_FLUSH_INTERVAL")
157 .ok()
158 .and_then(|v| v.parse::<u64>().ok())
159 .map(Duration::from_secs)
160 .unwrap_or(Duration::from_secs(5));
161 let sample_rate = std::env::var("LANGFUSE_SAMPLE_RATE")
162 .ok()
163 .and_then(|v| v.parse().ok())
164 .unwrap_or(1.0);
165 let environment = std::env::var("LANGFUSE_TRACING_ENVIRONMENT").ok();
166 let release = std::env::var("LANGFUSE_RELEASE").ok();
167 let debug = std::env::var("LANGFUSE_DEBUG")
168 .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
169 .unwrap_or(false);
170 let tracing_enabled = std::env::var("LANGFUSE_TRACING_ENABLED")
171 .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
172 .unwrap_or(true);
173 let max_retries = std::env::var("LANGFUSE_MAX_RETRIES")
174 .ok()
175 .and_then(|v| v.parse().ok())
176 .unwrap_or(3);
177 let media_upload_thread_count = std::env::var("LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT")
178 .ok()
179 .and_then(|v| v.parse().ok())
180 .unwrap_or(4);
181 let io_capture_enabled = std::env::var("LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED")
182 .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
183 .unwrap_or(true);
184
185 Ok(Self {
186 public_key,
187 secret_key,
188 base_url,
189 timeout,
190 flush_at,
191 flush_interval,
192 sample_rate,
193 environment,
194 release,
195 debug,
196 tracing_enabled,
197 mask: None,
198 additional_headers: None,
199 max_retries,
200 media_upload_thread_count,
201 io_capture_enabled,
202 should_export_span: None,
203 })
204 }
205
206 pub fn basic_auth_header(&self) -> String {
208 let credentials = format!("{}:{}", self.public_key, self.secret_key);
209 format!("Basic {}", BASE64.encode(credentials.as_bytes()))
210 }
211
212 pub fn otel_endpoint(&self) -> String {
214 format!(
215 "{}/api/public/otel/v1/traces",
216 self.base_url.trim_end_matches('/')
217 )
218 }
219
220 pub fn api_base_url(&self) -> String {
222 format!("{}/api/public", self.base_url.trim_end_matches('/'))
223 }
224}
225
226#[derive(Default)]
228pub struct LangfuseConfigBuilder {
229 public_key: Option<String>,
230 secret_key: Option<String>,
231 base_url: Option<String>,
232 timeout: Option<Duration>,
233 flush_at: Option<usize>,
234 flush_interval: Option<Duration>,
235 sample_rate: Option<f64>,
236 environment: Option<String>,
237 release: Option<String>,
238 debug: Option<bool>,
239 tracing_enabled: Option<bool>,
240 mask: Option<MaskFn>,
241 additional_headers: Option<HashMap<String, String>>,
242 max_retries: Option<usize>,
243 media_upload_thread_count: Option<usize>,
244 io_capture_enabled: Option<bool>,
245 should_export_span: Option<SpanFilterFn>,
246}
247
248impl LangfuseConfigBuilder {
249 #[must_use]
251 pub fn public_key(mut self, key: impl Into<String>) -> Self {
252 self.public_key = Some(key.into());
253 self
254 }
255
256 #[must_use]
258 pub fn secret_key(mut self, key: impl Into<String>) -> Self {
259 self.secret_key = Some(key.into());
260 self
261 }
262
263 #[must_use]
265 pub fn base_url(mut self, url: impl Into<String>) -> Self {
266 self.base_url = Some(url.into());
267 self
268 }
269
270 #[must_use]
272 pub fn timeout(mut self, timeout: Duration) -> Self {
273 self.timeout = Some(timeout);
274 self
275 }
276
277 #[must_use]
279 pub fn flush_at(mut self, count: usize) -> Self {
280 self.flush_at = Some(count);
281 self
282 }
283
284 #[must_use]
286 pub fn flush_interval(mut self, interval: Duration) -> Self {
287 self.flush_interval = Some(interval);
288 self
289 }
290
291 #[must_use]
293 pub fn sample_rate(mut self, rate: f64) -> Self {
294 self.sample_rate = Some(rate);
295 self
296 }
297
298 #[must_use]
300 pub fn environment(mut self, env: impl Into<String>) -> Self {
301 self.environment = Some(env.into());
302 self
303 }
304
305 #[must_use]
307 pub fn release(mut self, release: impl Into<String>) -> Self {
308 self.release = Some(release.into());
309 self
310 }
311
312 #[must_use]
314 pub fn debug(mut self, debug: bool) -> Self {
315 self.debug = Some(debug);
316 self
317 }
318
319 #[must_use]
321 pub fn tracing_enabled(mut self, enabled: bool) -> Self {
322 self.tracing_enabled = Some(enabled);
323 self
324 }
325
326 #[must_use]
328 pub fn mask(
329 mut self,
330 f: impl Fn(serde_json::Value) -> serde_json::Value + Send + Sync + 'static,
331 ) -> Self {
332 self.mask = Some(Arc::new(f));
333 self
334 }
335
336 #[must_use]
338 pub fn additional_headers(mut self, headers: HashMap<String, String>) -> Self {
339 self.additional_headers = Some(headers);
340 self
341 }
342
343 #[must_use]
345 pub fn max_retries(mut self, retries: usize) -> Self {
346 self.max_retries = Some(retries);
347 self
348 }
349
350 #[must_use]
352 pub fn media_upload_thread_count(mut self, count: usize) -> Self {
353 self.media_upload_thread_count = Some(count);
354 self
355 }
356
357 #[must_use]
359 pub fn io_capture_enabled(mut self, enabled: bool) -> Self {
360 self.io_capture_enabled = Some(enabled);
361 self
362 }
363
364 #[must_use]
366 pub fn should_export_span(
367 mut self,
368 f: impl Fn(&opentelemetry_sdk::trace::SpanData) -> bool + Send + Sync + 'static,
369 ) -> Self {
370 self.should_export_span = Some(Arc::new(f));
371 self
372 }
373
374 pub fn build(self) -> std::result::Result<LangfuseConfig, ConfigError> {
376 let public_key = self.public_key.ok_or_else(|| ConfigError::MissingField {
377 field: "public_key".into(),
378 })?;
379 let secret_key = self.secret_key.ok_or_else(|| ConfigError::MissingField {
380 field: "secret_key".into(),
381 })?;
382
383 Ok(LangfuseConfig {
384 public_key,
385 secret_key,
386 base_url: self
387 .base_url
388 .unwrap_or_else(|| "https://cloud.langfuse.com".into()),
389 timeout: self.timeout.unwrap_or(Duration::from_secs(5)),
390 flush_at: self.flush_at.unwrap_or(512),
391 flush_interval: self.flush_interval.unwrap_or(Duration::from_secs(5)),
392 sample_rate: self.sample_rate.unwrap_or(1.0),
393 environment: self.environment,
394 release: self.release,
395 debug: self.debug.unwrap_or(false),
396 tracing_enabled: self.tracing_enabled.unwrap_or(true),
397 mask: self.mask,
398 additional_headers: self.additional_headers,
399 max_retries: self.max_retries.unwrap_or(3),
400 media_upload_thread_count: self.media_upload_thread_count.unwrap_or(4),
401 io_capture_enabled: self.io_capture_enabled.unwrap_or(true),
402 should_export_span: self.should_export_span,
403 })
404 }
405}