1use std::fmt;
4use std::time::Duration;
5use thiserror::Error;
6
7pub type Result<T> = std::result::Result<T, Error>;
9
10#[derive(Error, Debug)]
11pub enum Error {
12 #[error("API error: {0}")]
13 Api(String),
14
15 #[error("Configuration error: {0}")]
16 Configuration(String),
17
18 #[error("Validation error: {0}")]
19 Validation(String),
20
21 #[error("Serialization error: {0}")]
22 Serialization(#[from] serde_json::Error),
23
24 #[error("Network error: {0}")]
25 Network(#[from] reqwest::Error),
26
27 #[error("Middleware error: {0}")]
28 Middleware(#[from] reqwest_middleware::Error),
29
30 #[error("Authentication failed: {message}")]
32 Auth {
33 message: String,
34 request_id: Option<String>,
36 },
37
38 #[error("Rate limit exceeded (retry after {retry_after:?})")]
40 RateLimit {
41 retry_after: Option<Duration>,
43 request_id: Option<String>,
45 },
46
47 #[error("Server error (status {status}): {message}")]
49 Server {
50 status: u16,
51 message: String,
52 request_id: Option<String>,
54 },
55
56 #[error("Client error (status {status}): {message}")]
58 Client {
59 status: u16,
60 message: String,
61 request_id: Option<String>,
63 },
64
65 #[error("Partial batch failure: {success_count} succeeded, {failure_count} failed")]
67 PartialFailure {
68 success_count: usize,
69 failure_count: usize,
70 errors: Vec<EventError>,
72 success_ids: Vec<String>,
74 },
75
76 #[error("Batch size exceeded: {size} bytes (max: {max_size} bytes)")]
78 BatchSizeExceeded { size: usize, max_size: usize },
79
80 #[error("Backpressure: {reason} (policy: {policy:?})")]
82 Backpressure {
83 policy: crate::BackpressurePolicy,
85 reason: String,
87 },
88}
89
90#[derive(Debug, Clone)]
92pub struct EventError {
93 pub event_id: String,
95 pub message: String,
97 pub code: Option<String>,
99 pub retryable: bool,
101}
102
103impl fmt::Display for EventError {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(f, "Event {} failed: {}", self.event_id, self.message)?;
106 if let Some(ref code) = self.code {
107 write!(f, " (code: {})", code)?;
108 }
109 if self.retryable {
110 write!(f, " [retryable]")?;
111 }
112 Ok(())
113 }
114}
115
116impl Error {
117 pub fn is_retryable(&self) -> bool {
119 match self {
120 Error::Network(_) => true,
121 Error::Middleware(_) => true,
122 Error::RateLimit { .. } => true,
123 Error::Server { .. } => true,
124 Error::PartialFailure { .. } => true,
125 Error::Auth { .. } => false,
126 Error::Client { .. } => false,
127 Error::Validation(_) => false,
128 Error::Serialization(_) => false,
129 Error::Configuration(_) => false,
130 Error::Api(_) => false,
131 Error::BatchSizeExceeded { .. } => false,
132 Error::Backpressure { .. } => false,
133 }
134 }
135
136 pub fn retry_after(&self) -> Option<Duration> {
138 match self {
139 Error::RateLimit { retry_after, .. } => *retry_after,
140 Error::Server { .. } => Some(Duration::from_secs(5)), Error::Network(_) => Some(Duration::from_secs(1)), _ => None,
143 }
144 }
145
146 pub fn request_id(&self) -> Option<&str> {
148 match self {
149 Error::Auth { request_id, .. }
150 | Error::RateLimit { request_id, .. }
151 | Error::Server { request_id, .. }
152 | Error::Client { request_id, .. } => request_id.as_deref(),
153 _ => None,
154 }
155 }
156}
157
158#[derive(Debug)]
160pub struct IngestionResponse {
161 pub success_ids: Vec<String>,
163 pub failures: Vec<EventError>,
165 pub success_count: usize,
167 pub failure_count: usize,
168}
169
170impl IngestionResponse {
171 pub fn is_success(&self) -> bool {
173 self.failure_count == 0
174 }
175
176 pub fn is_partial_failure(&self) -> bool {
178 self.success_count > 0 && self.failure_count > 0
179 }
180
181 pub fn to_error(&self) -> Option<Error> {
183 if self.failure_count > 0 {
184 Some(Error::PartialFailure {
185 success_count: self.success_count,
186 failure_count: self.failure_count,
187 errors: self.failures.clone(),
188 success_ids: self.success_ids.clone(),
189 })
190 } else {
191 None
192 }
193 }
194}
195
196pub fn map_api_error<T>(err: langfuse_client_base::apis::Error<T>) -> Error {
198 use langfuse_client_base::apis::Error as ApiError;
199
200 match err {
201 ApiError::Reqwest(e) => Error::Network(e),
202 ApiError::ReqwestMiddleware(e) => Error::Middleware(e),
203 ApiError::Serde(e) => Error::Serialization(e),
204 ApiError::Io(e) => Error::Api(format!("IO error: {}", e)),
205 ApiError::ResponseError(response) => {
206 let status = response.status.as_u16();
207 let message = if response.content.is_empty() {
208 format!("status {}", status)
209 } else {
210 response.content.clone()
211 };
212
213 match status {
214 401 | 403 => Error::Auth {
215 message,
216 request_id: None,
217 },
218 429 => Error::RateLimit {
219 retry_after: None,
220 request_id: None,
221 },
222 400..=499 => Error::Client {
223 status,
224 message,
225 request_id: None,
226 },
227 500..=599 => Error::Server {
228 status,
229 message,
230 request_id: None,
231 },
232 _ => Error::Api(message),
233 }
234 }
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use std::time::Duration;
242
243 #[test]
244 fn test_error_is_retryable() {
245 let rate_limit_error = Error::RateLimit {
247 retry_after: Some(Duration::from_secs(5)),
248 request_id: None,
249 };
250 assert!(rate_limit_error.is_retryable());
251
252 let server_error = Error::Server {
254 status: 500,
255 message: "Internal server error".to_string(),
256 request_id: None,
257 };
258 assert!(server_error.is_retryable());
259
260 let auth_error = Error::Auth {
262 message: "Invalid credentials".to_string(),
263 request_id: None,
264 };
265 assert!(!auth_error.is_retryable());
266
267 let client_error = Error::Client {
269 status: 400,
270 message: "Bad request".to_string(),
271 request_id: None,
272 };
273 assert!(!client_error.is_retryable());
274
275 let validation_error = Error::Validation("Invalid input".to_string());
277 assert!(!validation_error.is_retryable());
278 }
279
280 #[test]
281 fn test_error_retry_after() {
282 let rate_limit_error = Error::RateLimit {
284 retry_after: Some(Duration::from_secs(10)),
285 request_id: None,
286 };
287 assert_eq!(
288 rate_limit_error.retry_after(),
289 Some(Duration::from_secs(10))
290 );
291
292 let server_error = Error::Server {
294 status: 503,
295 message: "Service unavailable".to_string(),
296 request_id: None,
297 };
298 assert_eq!(server_error.retry_after(), Some(Duration::from_secs(5)));
299
300 let auth_error = Error::Auth {
302 message: "Unauthorized".to_string(),
303 request_id: None,
304 };
305 assert_eq!(auth_error.retry_after(), None);
306 }
307
308 #[test]
309 fn test_ingestion_response_success() {
310 let response = IngestionResponse {
311 success_ids: vec!["id1".to_string(), "id2".to_string()],
312 failures: vec![],
313 success_count: 2,
314 failure_count: 0,
315 };
316
317 assert!(response.is_success());
318 assert!(!response.is_partial_failure());
319 assert!(response.to_error().is_none());
320 }
321
322 #[test]
323 fn test_ingestion_response_partial_failure() {
324 let response = IngestionResponse {
325 success_ids: vec!["id1".to_string()],
326 failures: vec![EventError {
327 event_id: "id2".to_string(),
328 message: "Validation failed".to_string(),
329 code: Some("VALIDATION_ERROR".to_string()),
330 retryable: false,
331 }],
332 success_count: 1,
333 failure_count: 1,
334 };
335
336 assert!(!response.is_success());
337 assert!(response.is_partial_failure());
338
339 let error = response.to_error().unwrap();
340 match error {
341 Error::PartialFailure {
342 success_count,
343 failure_count,
344 ..
345 } => {
346 assert_eq!(success_count, 1);
347 assert_eq!(failure_count, 1);
348 }
349 _ => panic!("Expected PartialFailure error"),
350 }
351 }
352
353 #[test]
354 fn test_ingestion_response_total_failure() {
355 let response = IngestionResponse {
356 success_ids: vec![],
357 failures: vec![
358 EventError {
359 event_id: "id1".to_string(),
360 message: "Auth failed".to_string(),
361 code: Some("AUTH_ERROR".to_string()),
362 retryable: false,
363 },
364 EventError {
365 event_id: "id2".to_string(),
366 message: "Rate limited".to_string(),
367 code: Some("RATE_LIMIT".to_string()),
368 retryable: true,
369 },
370 ],
371 success_count: 0,
372 failure_count: 2,
373 };
374
375 assert!(!response.is_success());
376 assert!(!response.is_partial_failure()); assert!(response.to_error().is_some());
378 }
379
380 #[test]
381 fn test_event_error_display() {
382 let error = EventError {
383 event_id: "test-id".to_string(),
384 message: "Something went wrong".to_string(),
385 code: Some("TEST_ERROR".to_string()),
386 retryable: true,
387 };
388
389 let display = format!("{}", error);
390 assert!(display.contains("test-id"));
391 assert!(display.contains("Something went wrong"));
392 assert!(display.contains("TEST_ERROR"));
393 assert!(display.contains("retryable"));
394 }
395
396 #[test]
397 fn test_event_error_display_minimal() {
398 let error = EventError {
399 event_id: "minimal-id".to_string(),
400 message: "Minimal error".to_string(),
401 code: None,
402 retryable: false,
403 };
404
405 let display = format!("{}", error);
406 assert!(display.contains("minimal-id"));
407 assert!(display.contains("Minimal error"));
408 assert!(!display.contains("retryable"));
409 }
410}