1use serde_json::Value;
2use std::error::Error as StdError;
3use std::time::Duration;
4use thiserror::Error;
5
6type BoxError = Box<dyn StdError + Send + Sync>;
7
8pub type Result<T> = std::result::Result<T, ConduitError>;
10
11#[derive(Debug, Error)]
12#[error("{message}")]
13pub struct ErrorContext {
15 message: String,
16 code: String,
17 request_id: Option<String>,
18 #[source]
19 source: Option<BoxError>,
20}
21
22impl ErrorContext {
23 fn new(message: impl Into<String>, code: impl Into<String>) -> Self {
24 Self {
25 message: message.into(),
26 code: code.into(),
27 request_id: None,
28 source: None,
29 }
30 }
31
32 fn with_request_id(mut self, request_id: Option<String>) -> Self {
33 self.request_id = request_id;
34 self
35 }
36
37 fn with_source<E>(mut self, source: E) -> Self
38 where
39 E: StdError + Send + Sync + 'static,
40 {
41 self.source = Some(Box::new(source));
42 self
43 }
44}
45
46#[derive(Debug, Error)]
47#[error("{message}")]
48pub struct SourceContext {
50 message: String,
51 code: String,
52 request_id: Option<String>,
53 url: Option<String>,
54 status: Option<u16>,
55 #[source]
56 source: Option<BoxError>,
57}
58
59impl SourceContext {
60 fn new(message: impl Into<String>, code: impl Into<String>) -> Self {
61 Self {
62 message: message.into(),
63 code: code.into(),
64 request_id: None,
65 url: None,
66 status: None,
67 source: None,
68 }
69 }
70
71 fn with_remote(mut self, url: Option<String>, status: Option<u16>) -> Self {
72 self.url = url;
73 self.status = status;
74 self
75 }
76
77 fn with_source<E>(mut self, source: E) -> Self
78 where
79 E: StdError + Send + Sync + 'static,
80 {
81 self.source = Some(Box::new(source));
82 self
83 }
84}
85
86#[derive(Debug, Error)]
87#[error("{message}")]
88pub struct ApiContext {
90 message: String,
91 code: String,
92 request_id: Option<String>,
93 status: u16,
94 details: Option<Box<Value>>,
95}
96
97impl ApiContext {
98 fn new(
99 status: u16,
100 request_id: Option<String>,
101 message: impl Into<String>,
102 code: impl Into<String>,
103 details: Option<Value>,
104 ) -> Self {
105 Self {
106 message: message.into(),
107 code: code.into(),
108 request_id,
109 status,
110 details: details.map(Box::new),
111 }
112 }
113}
114
115#[derive(Debug, Error)]
116#[error("{message}")]
117pub struct RateLimitContext {
119 message: String,
120 code: String,
121 request_id: Option<String>,
122 status: u16,
123 details: Option<Box<Value>>,
124 retry_after: Option<Duration>,
125}
126
127#[derive(Debug, Error)]
128#[error("{message}")]
129pub struct CreditsContext {
131 message: String,
132 code: String,
133 request_id: Option<String>,
134 status: u16,
135 details: Option<Box<Value>>,
136 required: f64,
137 available: f64,
138}
139
140#[derive(Debug, Error)]
141#[error("{message}")]
142pub struct JobContext {
144 message: String,
145 code: String,
146 request_id: Option<String>,
147 job_id: String,
148}
149
150#[derive(Debug, Error)]
151#[error("{message}")]
152pub struct StreamContext {
154 message: String,
155 code: String,
156 request_id: Option<String>,
157 job_id: Option<String>,
158 last_event_id: Option<String>,
159 retry_count: usize,
160 #[source]
161 source: Option<BoxError>,
162}
163
164impl StreamContext {
165 fn new(message: impl Into<String>, job_id: Option<String>) -> Self {
166 Self {
167 message: message.into(),
168 code: "stream_error".into(),
169 request_id: None,
170 job_id,
171 last_event_id: None,
172 retry_count: 0,
173 source: None,
174 }
175 }
176
177 fn with_source<E>(mut self, source: E) -> Self
178 where
179 E: StdError + Send + Sync + 'static,
180 {
181 self.source = Some(Box::new(source));
182 self
183 }
184}
185
186#[derive(Debug, Error)]
187pub enum ConduitError {
189 #[error(transparent)]
190 Base(Box<ErrorContext>),
192 #[error(transparent)]
193 Initialization(Box<ErrorContext>),
195 #[error(transparent)]
196 UnsupportedRuntime(Box<ErrorContext>),
198 #[error(transparent)]
199 WebhookVerification(Box<ErrorContext>),
201 #[error(transparent)]
202 InvalidSource(Box<SourceContext>),
204 #[error(transparent)]
205 RemoteFetch(Box<SourceContext>),
207 #[error(transparent)]
208 RemoteFetchTimeout(Box<SourceContext>),
210 #[error(transparent)]
211 RemoteFetchTooLarge(Box<SourceContext>),
213 #[error(transparent)]
214 Api(Box<ApiContext>),
216 #[error(transparent)]
217 Auth(Box<ApiContext>),
219 #[error(transparent)]
220 Validation(Box<ApiContext>),
222 #[error(transparent)]
223 RateLimit(Box<RateLimitContext>),
225 #[error(transparent)]
226 InsufficientCredits(Box<CreditsContext>),
228 #[error(transparent)]
229 JobFailed(Box<JobContext>),
231 #[error(transparent)]
232 JobCanceled(Box<JobContext>),
234 #[error(transparent)]
235 Timeout(Box<ErrorContext>),
237 #[error(transparent)]
238 RequestAborted(Box<ErrorContext>),
240 #[error(transparent)]
241 Stream(Box<StreamContext>),
243}
244
245impl ConduitError {
246 pub(crate) fn with_source<E>(self, source: E) -> Self
247 where
248 E: StdError + Send + Sync + 'static,
249 {
250 match self {
251 Self::Base(context) => Self::Base(Box::new(context.with_source(source))),
252 Self::Initialization(context) => {
253 Self::Initialization(Box::new(context.with_source(source)))
254 }
255 Self::UnsupportedRuntime(context) => {
256 Self::UnsupportedRuntime(Box::new(context.with_source(source)))
257 }
258 Self::WebhookVerification(context) => {
259 Self::WebhookVerification(Box::new(context.with_source(source)))
260 }
261 Self::InvalidSource(context) => {
262 Self::InvalidSource(Box::new(context.with_source(source)))
263 }
264 Self::RemoteFetch(context) => Self::RemoteFetch(Box::new(context.with_source(source))),
265 Self::RemoteFetchTimeout(context) => {
266 Self::RemoteFetchTimeout(Box::new(context.with_source(source)))
267 }
268 Self::RemoteFetchTooLarge(context) => {
269 Self::RemoteFetchTooLarge(Box::new(context.with_source(source)))
270 }
271 Self::Timeout(context) => Self::Timeout(Box::new(context.with_source(source))),
272 Self::RequestAborted(context) => {
273 Self::RequestAborted(Box::new(context.with_source(source)))
274 }
275 Self::Stream(context) => Self::Stream(Box::new(context.with_source(source))),
276 other => other,
277 }
278 }
279
280 pub fn code(&self) -> &str {
282 match self {
283 Self::Base(context)
284 | Self::Initialization(context)
285 | Self::UnsupportedRuntime(context)
286 | Self::WebhookVerification(context)
287 | Self::Timeout(context)
288 | Self::RequestAborted(context) => &context.code,
289 Self::InvalidSource(context)
290 | Self::RemoteFetch(context)
291 | Self::RemoteFetchTimeout(context)
292 | Self::RemoteFetchTooLarge(context) => &context.code,
293 Self::Api(context) | Self::Auth(context) | Self::Validation(context) => &context.code,
294 Self::RateLimit(context) => &context.code,
295 Self::InsufficientCredits(context) => &context.code,
296 Self::JobFailed(context) | Self::JobCanceled(context) => &context.code,
297 Self::Stream(context) => &context.code,
298 }
299 }
300
301 pub fn request_id(&self) -> Option<&str> {
303 match self {
304 Self::Base(context)
305 | Self::Initialization(context)
306 | Self::UnsupportedRuntime(context)
307 | Self::WebhookVerification(context)
308 | Self::Timeout(context)
309 | Self::RequestAborted(context) => context.request_id.as_deref(),
310 Self::InvalidSource(context)
311 | Self::RemoteFetch(context)
312 | Self::RemoteFetchTimeout(context)
313 | Self::RemoteFetchTooLarge(context) => context.request_id.as_deref(),
314 Self::Api(context) | Self::Auth(context) | Self::Validation(context) => {
315 context.request_id.as_deref()
316 }
317 Self::RateLimit(context) => context.request_id.as_deref(),
318 Self::InsufficientCredits(context) => context.request_id.as_deref(),
319 Self::JobFailed(context) | Self::JobCanceled(context) => context.request_id.as_deref(),
320 Self::Stream(context) => context.request_id.as_deref(),
321 }
322 }
323
324 pub fn status(&self) -> Option<u16> {
326 match self {
327 Self::InvalidSource(context)
328 | Self::RemoteFetch(context)
329 | Self::RemoteFetchTimeout(context)
330 | Self::RemoteFetchTooLarge(context) => context.status,
331 Self::Api(context) | Self::Auth(context) | Self::Validation(context) => {
332 Some(context.status)
333 }
334 Self::RateLimit(context) => Some(context.status),
335 Self::InsufficientCredits(context) => Some(context.status),
336 _ => None,
337 }
338 }
339
340 pub(crate) fn base(message: impl Into<String>, code: impl Into<String>) -> Self {
341 Self::Base(Box::new(ErrorContext::new(message, code)))
342 }
343
344 pub(crate) fn invalid_request(message: impl Into<String>) -> Self {
345 Self::base(message, "invalid_request")
346 }
347
348 pub(crate) fn invalid_response(message: impl Into<String>) -> Self {
349 Self::base(message, "invalid_response")
350 }
351
352 pub fn invalid_webhook_payload(message: impl Into<String>) -> Self {
354 Self::base(message, "invalid_webhook_payload")
355 }
356
357 pub(crate) fn initialization(message: impl Into<String>, code: impl Into<String>) -> Self {
358 Self::Initialization(Box::new(ErrorContext::new(message, code)))
359 }
360
361 pub(crate) fn webhook(message: impl Into<String>, code: impl Into<String>) -> Self {
362 Self::WebhookVerification(Box::new(ErrorContext::new(message, code)))
363 }
364
365 pub(crate) fn invalid_source(message: impl Into<String>) -> Self {
366 Self::InvalidSource(Box::new(SourceContext::new(message, "invalid_source")))
367 }
368
369 pub(crate) fn source_too_large(message: impl Into<String>) -> Self {
370 Self::InvalidSource(Box::new(SourceContext::new(message, "source_too_large")))
371 }
372
373 pub(crate) fn remote_fetch(
374 message: impl Into<String>,
375 code: impl Into<String>,
376 url: Option<String>,
377 status: Option<u16>,
378 ) -> Self {
379 Self::RemoteFetch(Box::new(
380 SourceContext::new(message, code).with_remote(url, status),
381 ))
382 }
383
384 pub(crate) fn remote_fetch_timeout(url: Option<String>, status: Option<u16>) -> Self {
385 Self::RemoteFetchTimeout(Box::new(
386 SourceContext::new("remote fetch timed out", "remote_fetch_timeout")
387 .with_remote(url, status),
388 ))
389 }
390
391 pub(crate) fn remote_fetch_too_large(url: Option<String>, status: Option<u16>) -> Self {
392 Self::RemoteFetchTooLarge(Box::new(
393 SourceContext::new("source.url exceeds upload size limit", "source_too_large")
394 .with_remote(url, status),
395 ))
396 }
397
398 pub(crate) fn api(
399 status: u16,
400 request_id: Option<String>,
401 message: impl Into<String>,
402 code: impl Into<String>,
403 details: Option<Value>,
404 retry_after: Option<Duration>,
405 ) -> Self {
406 let message = message.into();
407 let code = code.into();
408 match status {
409 401 | 403 => Self::Auth(Box::new(ApiContext::new(
410 status, request_id, message, code, details,
411 ))),
412 402 => {
413 let (required, available) = read_credit_values(details.as_ref());
414 Self::InsufficientCredits(Box::new(CreditsContext {
415 message,
416 code,
417 request_id,
418 status,
419 details: details.map(Box::new),
420 required,
421 available,
422 }))
423 }
424 422 => Self::Validation(Box::new(ApiContext::new(
425 status, request_id, message, code, details,
426 ))),
427 429 => Self::RateLimit(Box::new(RateLimitContext {
428 message,
429 code,
430 request_id,
431 status,
432 details: details.map(Box::new),
433 retry_after,
434 })),
435 _ => Self::Api(Box::new(ApiContext::new(
436 status, request_id, message, code, details,
437 ))),
438 }
439 }
440
441 pub(crate) fn job_failed(
442 job_id: impl Into<String>,
443 request_id: Option<String>,
444 code: impl Into<String>,
445 message: impl Into<String>,
446 ) -> Self {
447 Self::JobFailed(Box::new(JobContext {
448 message: message.into(),
449 code: code.into(),
450 request_id,
451 job_id: job_id.into(),
452 }))
453 }
454
455 pub(crate) fn job_canceled(job_id: impl Into<String>, request_id: Option<String>) -> Self {
456 let job_id = job_id.into();
457 Self::JobCanceled(Box::new(JobContext {
458 message: format!("job {job_id} canceled"),
459 code: "job_canceled".into(),
460 request_id,
461 job_id,
462 }))
463 }
464
465 pub(crate) fn timeout(message: impl Into<String>, request_id: Option<String>) -> Self {
466 Self::Timeout(Box::new(
467 ErrorContext::new(message, "timeout").with_request_id(request_id),
468 ))
469 }
470
471 pub(crate) fn request_aborted(request_id: Option<String>) -> Self {
472 Self::RequestAborted(Box::new(
473 ErrorContext::new("request aborted by caller", "request_aborted")
474 .with_request_id(request_id),
475 ))
476 }
477
478 pub(crate) fn stream(message: impl Into<String>, job_id: Option<String>) -> Self {
479 Self::Stream(Box::new(StreamContext::new(message, job_id)))
480 }
481}
482
483pub(crate) fn rate_limit_retry_after(error: &ConduitError) -> Option<Duration> {
484 let ConduitError::RateLimit(context) = error else {
485 return None;
486 };
487 context.retry_after
488}
489
490pub(crate) fn is_retryable_api_error(error: &ConduitError) -> bool {
491 let ConduitError::Api(context) = error else {
492 return false;
493 };
494 context.status >= 500
495}
496
497pub(crate) fn is_transport_error(error: &ConduitError) -> bool {
498 let ConduitError::Base(context) = error else {
499 return false;
500 };
501 context.code == "transport_error"
502}
503
504fn read_credit_values(details: Option<&Value>) -> (f64, f64) {
505 let Some(Value::Object(map)) = details else {
506 return (0.0, 0.0);
507 };
508 let required = map
509 .get("required")
510 .and_then(Value::as_f64)
511 .unwrap_or_default();
512 let available = map
513 .get("available")
514 .and_then(Value::as_f64)
515 .unwrap_or_default();
516 (required, available)
517}