Skip to main content

langfuse_ergonomic/
client.rs

1//! Main client for interacting with the Langfuse API
2
3use crate::batcher::{Batcher, BatcherConfig};
4use crate::error::{Error, Result};
5use langfuse_client_base::apis::configuration::Configuration;
6use std::sync::Arc;
7use std::time::Duration;
8
9/// SDK version for User-Agent header
10const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
11const SDK_NAME: &str = env!("CARGO_PKG_NAME");
12
13/// Default timeout for API requests
14const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
15
16/// Default connection timeout
17const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
18
19/// Main client for interacting with the Langfuse API
20#[derive(Clone)]
21pub struct LangfuseClient {
22    pub(crate) public_key: String,
23    pub(crate) secret_key: String,
24    pub(crate) base_url: String,
25    pub(crate) configuration: Configuration,
26}
27
28impl LangfuseClient {
29    /// Get the underlying API configuration
30    pub fn configuration(&self) -> &Configuration {
31        &self.configuration
32    }
33
34    /// Validate that the client credentials are valid
35    pub async fn validate(&self) -> Result<bool> {
36        use crate::error::Error;
37
38        // Make a lightweight request to the health endpoint
39        let url = format!("{}/api/public/health", self.base_url);
40        let response = self
41            .configuration
42            .client
43            .get(&url)
44            .basic_auth(&self.public_key, Some(&self.secret_key))
45            .timeout(Duration::from_secs(5))
46            .send()
47            .await
48            .map_err(Error::Middleware)?;
49
50        // Check if we got a successful response
51        match response.status() {
52            status if status.is_success() => Ok(true),
53            status if status == 401 || status == 403 => Err(Error::Auth {
54                message: "Invalid credentials".to_string(),
55                request_id: response
56                    .headers()
57                    .get("x-request-id")
58                    .and_then(|v| v.to_str().ok())
59                    .map(|s| s.to_string()),
60            }),
61            status => Err(Error::Client {
62                status: status.as_u16(),
63                message: format!("Validation failed with status {}", status),
64                request_id: response
65                    .headers()
66                    .get("x-request-id")
67                    .and_then(|v| v.to_str().ok())
68                    .map(|s| s.to_string()),
69            }),
70        }
71    }
72
73    /// Create a batcher for efficient batch ingestion
74    ///
75    /// The batcher automatically handles:
76    /// - Batching events up to size/count limits
77    /// - Automatic flushing on intervals
78    /// - 207 Multi-Status response parsing
79    /// - Retrying only failed events
80    /// - Exponential backoff for retryable errors
81    ///
82    /// # Example
83    /// ```no_run
84    /// # use langfuse_ergonomic::{ClientBuilder, LangfuseClient, BatcherConfig};
85    /// # use std::sync::Arc;
86    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
87    /// let client = Arc::new(ClientBuilder::from_env()?.build()?);
88    /// let batcher = client.create_batcher(None).await;
89    /// # Ok(())
90    /// # }
91    /// ```
92    pub async fn create_batcher(self: Arc<Self>, config: Option<BatcherConfig>) -> Batcher {
93        // Clone the Arc to avoid moving self
94        let client = LangfuseClient {
95            public_key: self.public_key.clone(),
96            secret_key: self.secret_key.clone(),
97            base_url: self.base_url.clone(),
98            configuration: Configuration {
99                base_path: self.configuration.base_path.clone(),
100                basic_auth: self.configuration.basic_auth.clone(),
101                api_key: self.configuration.api_key.clone(),
102                oauth_access_token: self.configuration.oauth_access_token.clone(),
103                bearer_access_token: self.configuration.bearer_access_token.clone(),
104                client: self.configuration.client.clone(),
105                user_agent: self.configuration.user_agent.clone(),
106            },
107        };
108
109        let config = config.unwrap_or_default();
110
111        Batcher::builder()
112            .client(client)
113            .max_events(config.max_events)
114            .max_bytes(config.max_bytes)
115            .flush_interval(config.flush_interval)
116            .max_retries(config.max_retries)
117            .initial_retry_delay(config.initial_retry_delay)
118            .max_retry_delay(config.max_retry_delay)
119            .retry_jitter(config.retry_jitter)
120            .max_queue_size(config.max_queue_size)
121            .backpressure_policy(config.backpressure_policy)
122            .fail_fast(config.fail_fast)
123            .build()
124            .await
125    }
126
127    /// Start building a [`Batcher`] anchored to this client.
128    pub fn batcher(&self) -> crate::batcher::BatcherBuilderWithClient {
129        crate::batcher::Batcher::builder().client(self.clone())
130    }
131
132    fn build_internal(
133        public_key: String,
134        secret_key: String,
135        base_url: String,
136        timeout: Option<Duration>,
137        connect_timeout: Option<Duration>,
138        user_agent: Option<String>,
139        http_client: Option<reqwest_middleware::ClientWithMiddleware>,
140    ) -> Self {
141        // Use provided client or build a default one
142        let client = http_client.unwrap_or_else(|| {
143            #[allow(unused_mut)]
144            let mut client_builder = reqwest::Client::builder()
145                .timeout(timeout.unwrap_or(DEFAULT_TIMEOUT))
146                .connect_timeout(connect_timeout.unwrap_or(DEFAULT_CONNECT_TIMEOUT))
147                .pool_max_idle_per_host(10)
148                .pool_idle_timeout(Duration::from_secs(90));
149
150            #[cfg(not(feature = "compression"))]
151            {
152                client_builder = client_builder.no_gzip().no_brotli().no_deflate();
153            }
154
155            let reqwest_client = client_builder
156                .build()
157                .unwrap_or_else(|_| reqwest::Client::new());
158
159            // Wrap with middleware support
160            reqwest_middleware::ClientBuilder::new(reqwest_client).build()
161        });
162
163        let default_user_agent = format!("{}/{} (Rust)", SDK_NAME, SDK_VERSION);
164        let final_user_agent = user_agent.unwrap_or(default_user_agent);
165
166        let configuration = Configuration {
167            base_path: base_url.clone(),
168            basic_auth: Some((public_key.clone(), Some(secret_key.clone()))),
169            api_key: None,
170            oauth_access_token: None,
171            bearer_access_token: None,
172            client,
173            user_agent: Some(final_user_agent),
174        };
175
176        Self {
177            public_key,
178            secret_key,
179            base_url,
180            configuration,
181        }
182    }
183}
184
185/// Builder for [`LangfuseClient`], mirroring the style of `opentelemetry-langfuse`.
186#[derive(Default, Debug, Clone)]
187pub struct ClientBuilder {
188    public_key: Option<String>,
189    secret_key: Option<String>,
190    base_url: Option<String>,
191    timeout: Option<Duration>,
192    connect_timeout: Option<Duration>,
193    user_agent: Option<String>,
194    http_client: Option<reqwest_middleware::ClientWithMiddleware>,
195}
196
197impl ClientBuilder {
198    /// Start a new builder without credentials. Use [`ClientBuilder::public_key`] and
199    /// [`ClientBuilder::secret_key`] to provide them before calling [`ClientBuilder::build`].
200    pub fn new() -> Self {
201        Self::default()
202    }
203
204    /// Create a builder pre-populated from environment variables.
205    pub fn from_env() -> Result<Self> {
206        use std::env;
207
208        let public_key = env::var("LANGFUSE_PUBLIC_KEY").map_err(|_| {
209            Error::Configuration("LANGFUSE_PUBLIC_KEY environment variable not set".to_string())
210        })?;
211
212        let secret_key = env::var("LANGFUSE_SECRET_KEY").map_err(|_| {
213            Error::Configuration("LANGFUSE_SECRET_KEY environment variable not set".to_string())
214        })?;
215
216        let base_url = env::var("LANGFUSE_BASE_URL").ok();
217
218        Ok(Self {
219            public_key: Some(public_key),
220            secret_key: Some(secret_key),
221            base_url,
222            ..Self::default()
223        })
224    }
225
226    /// Set the public key used for authentication.
227    #[must_use]
228    pub fn public_key(mut self, value: impl Into<String>) -> Self {
229        self.public_key = Some(value.into());
230        self
231    }
232
233    /// Set the secret key used for authentication.
234    #[must_use]
235    pub fn secret_key(mut self, value: impl Into<String>) -> Self {
236        self.secret_key = Some(value.into());
237        self
238    }
239
240    /// Override the Langfuse base URL (defaults to `https://cloud.langfuse.com`).
241    #[must_use]
242    pub fn base_url(mut self, value: impl Into<String>) -> Self {
243        self.base_url = Some(value.into());
244        self
245    }
246
247    /// Override the request timeout (defaults to 60 seconds).
248    #[must_use]
249    pub fn timeout(mut self, value: Duration) -> Self {
250        self.timeout = Some(value);
251        self
252    }
253
254    /// Override the connection timeout (defaults to 10 seconds).
255    #[must_use]
256    pub fn connect_timeout(mut self, value: Duration) -> Self {
257        self.connect_timeout = Some(value);
258        self
259    }
260
261    /// Override the user agent string.
262    #[must_use]
263    pub fn user_agent(mut self, value: impl Into<String>) -> Self {
264        self.user_agent = Some(value.into());
265        self
266    }
267
268    /// Set a custom HTTP client with middleware.
269    ///
270    /// This allows you to provide a pre-configured `ClientWithMiddleware` with
271    /// custom settings like retry policies, connection pooling, logging, etc.
272    ///
273    /// # Example
274    ///
275    /// ```rust,ignore
276    /// use reqwest_middleware::ClientBuilder;
277    /// use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
278    ///
279    /// let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
280    /// let client = ClientBuilder::new(reqwest::Client::new())
281    ///     .with(RetryTransientMiddleware::new_with_policy(retry_policy))
282    ///     .build();
283    ///
284    /// let langfuse_client = langfuse_ergonomic::ClientBuilder::from_env()?
285    ///     .http_client(client)
286    ///     .build()?;
287    /// ```
288    #[must_use]
289    pub fn http_client(mut self, client: reqwest_middleware::ClientWithMiddleware) -> Self {
290        self.http_client = Some(client);
291        self
292    }
293
294    /// Build a [`LangfuseClient`] using the configured options.
295    pub fn build(self) -> Result<LangfuseClient> {
296        let public_key = self
297            .public_key
298            .ok_or_else(|| Error::Configuration("Langfuse public key is required".to_string()))?;
299        let secret_key = self
300            .secret_key
301            .ok_or_else(|| Error::Configuration("Langfuse secret key is required".to_string()))?;
302        let base_url = self
303            .base_url
304            .unwrap_or_else(|| "https://cloud.langfuse.com".to_string());
305
306        Ok(LangfuseClient::build_internal(
307            public_key,
308            secret_key,
309            base_url,
310            self.timeout,
311            self.connect_timeout,
312            self.user_agent,
313            self.http_client,
314        ))
315    }
316}