Skip to main content

conduit_rs/
error.rs

1use serde_json::Value;
2use std::error::Error as StdError;
3use std::time::Duration;
4use thiserror::Error;
5
6type BoxError = Box<dyn StdError + Send + Sync>;
7
8/// Result alias used by the Conduit Rust SDK.
9pub type Result<T> = std::result::Result<T, ConduitError>;
10
11#[derive(Debug, Error)]
12#[error("{message}")]
13/// Shared error context used by several SDK error variants.
14pub struct ErrorContext {
15    message: String,
16    code: String,
17    request_id: Option<String>,
18    #[source]
19    source: Option<BoxError>,
20}
21
22impl ErrorContext {
23    fn new(message: impl Into<String>, code: impl Into<String>) -> Self {
24        Self {
25            message: message.into(),
26            code: code.into(),
27            request_id: None,
28            source: None,
29        }
30    }
31
32    fn with_request_id(mut self, request_id: Option<String>) -> Self {
33        self.request_id = request_id;
34        self
35    }
36
37    fn with_source<E>(mut self, source: E) -> Self
38    where
39        E: StdError + Send + Sync + 'static,
40    {
41        self.source = Some(Box::new(source));
42        self
43    }
44}
45
46#[derive(Debug, Error)]
47#[error("{message}")]
48/// Source-specific context used by upload and remote fetch errors.
49pub struct SourceContext {
50    message: String,
51    code: String,
52    request_id: Option<String>,
53    url: Option<String>,
54    status: Option<u16>,
55    #[source]
56    source: Option<BoxError>,
57}
58
59impl SourceContext {
60    fn new(message: impl Into<String>, code: impl Into<String>) -> Self {
61        Self {
62            message: message.into(),
63            code: code.into(),
64            request_id: None,
65            url: None,
66            status: None,
67            source: None,
68        }
69    }
70
71    fn with_remote(mut self, url: Option<String>, status: Option<u16>) -> Self {
72        self.url = url;
73        self.status = status;
74        self
75    }
76
77    fn with_source<E>(mut self, source: E) -> Self
78    where
79        E: StdError + Send + Sync + 'static,
80    {
81        self.source = Some(Box::new(source));
82        self
83    }
84}
85
86#[derive(Debug, Error)]
87#[error("{message}")]
88/// API error context shared by auth, validation, and generic API failures.
89pub struct ApiContext {
90    message: String,
91    code: String,
92    request_id: Option<String>,
93    status: u16,
94    details: Option<Box<Value>>,
95}
96
97impl ApiContext {
98    fn new(
99        status: u16,
100        request_id: Option<String>,
101        message: impl Into<String>,
102        code: impl Into<String>,
103        details: Option<Value>,
104    ) -> Self {
105        Self {
106            message: message.into(),
107            code: code.into(),
108            request_id,
109            status,
110            details: details.map(Box::new),
111        }
112    }
113}
114
115#[derive(Debug, Error)]
116#[error("{message}")]
117/// Rate-limit error context.
118pub struct RateLimitContext {
119    message: String,
120    code: String,
121    request_id: Option<String>,
122    status: u16,
123    details: Option<Box<Value>>,
124    retry_after: Option<Duration>,
125}
126
127#[derive(Debug, Error)]
128#[error("{message}")]
129/// Insufficient credits error context.
130pub struct CreditsContext {
131    message: String,
132    code: String,
133    request_id: Option<String>,
134    status: u16,
135    details: Option<Box<Value>>,
136    required: f64,
137    available: f64,
138}
139
140#[derive(Debug, Error)]
141#[error("{message}")]
142/// Job failure or cancellation context.
143pub struct JobContext {
144    message: String,
145    code: String,
146    request_id: Option<String>,
147    job_id: String,
148}
149
150#[derive(Debug, Error)]
151#[error("{message}")]
152/// Stream polling error context.
153pub struct StreamContext {
154    message: String,
155    code: String,
156    request_id: Option<String>,
157    job_id: Option<String>,
158    last_event_id: Option<String>,
159    retry_count: usize,
160    #[source]
161    source: Option<BoxError>,
162}
163
164impl StreamContext {
165    fn new(message: impl Into<String>, job_id: Option<String>) -> Self {
166        Self {
167            message: message.into(),
168            code: "stream_error".into(),
169            request_id: None,
170            job_id,
171            last_event_id: None,
172            retry_count: 0,
173            source: None,
174        }
175    }
176
177    fn with_source<E>(mut self, source: E) -> Self
178    where
179        E: StdError + Send + Sync + 'static,
180    {
181        self.source = Some(Box::new(source));
182        self
183    }
184}
185
186#[derive(Debug, Error)]
187/// Typed error returned by all public SDK operations.
188pub enum ConduitError {
189    #[error(transparent)]
190    /// Generic SDK error.
191    Base(Box<ErrorContext>),
192    #[error(transparent)]
193    /// Client initialization or configuration error.
194    Initialization(Box<ErrorContext>),
195    #[error(transparent)]
196    /// Unsupported runtime capability error.
197    UnsupportedRuntime(Box<ErrorContext>),
198    #[error(transparent)]
199    /// Webhook signature verification error.
200    WebhookVerification(Box<ErrorContext>),
201    #[error(transparent)]
202    /// Invalid local source configuration error.
203    InvalidSource(Box<SourceContext>),
204    #[error(transparent)]
205    /// Remote source fetch error.
206    RemoteFetch(Box<SourceContext>),
207    #[error(transparent)]
208    /// Remote source fetch timeout error.
209    RemoteFetchTimeout(Box<SourceContext>),
210    #[error(transparent)]
211    /// Remote source exceeded the upload size limit.
212    RemoteFetchTooLarge(Box<SourceContext>),
213    #[error(transparent)]
214    /// Generic API error.
215    Api(Box<ApiContext>),
216    #[error(transparent)]
217    /// Authentication or authorization API error.
218    Auth(Box<ApiContext>),
219    #[error(transparent)]
220    /// API validation error.
221    Validation(Box<ApiContext>),
222    #[error(transparent)]
223    /// API rate limit error.
224    RateLimit(Box<RateLimitContext>),
225    #[error(transparent)]
226    /// API insufficient credits error.
227    InsufficientCredits(Box<CreditsContext>),
228    #[error(transparent)]
229    /// Terminal job failure error.
230    JobFailed(Box<JobContext>),
231    #[error(transparent)]
232    /// Terminal job cancellation error.
233    JobCanceled(Box<JobContext>),
234    #[error(transparent)]
235    /// SDK-enforced timeout error.
236    Timeout(Box<ErrorContext>),
237    #[error(transparent)]
238    /// Caller-initiated request cancellation error.
239    RequestAborted(Box<ErrorContext>),
240    #[error(transparent)]
241    /// Polling or streaming helper error.
242    Stream(Box<StreamContext>),
243}
244
245impl ConduitError {
246    pub(crate) fn with_source<E>(self, source: E) -> Self
247    where
248        E: StdError + Send + Sync + 'static,
249    {
250        match self {
251            Self::Base(context) => Self::Base(Box::new(context.with_source(source))),
252            Self::Initialization(context) => {
253                Self::Initialization(Box::new(context.with_source(source)))
254            }
255            Self::UnsupportedRuntime(context) => {
256                Self::UnsupportedRuntime(Box::new(context.with_source(source)))
257            }
258            Self::WebhookVerification(context) => {
259                Self::WebhookVerification(Box::new(context.with_source(source)))
260            }
261            Self::InvalidSource(context) => {
262                Self::InvalidSource(Box::new(context.with_source(source)))
263            }
264            Self::RemoteFetch(context) => Self::RemoteFetch(Box::new(context.with_source(source))),
265            Self::RemoteFetchTimeout(context) => {
266                Self::RemoteFetchTimeout(Box::new(context.with_source(source)))
267            }
268            Self::RemoteFetchTooLarge(context) => {
269                Self::RemoteFetchTooLarge(Box::new(context.with_source(source)))
270            }
271            Self::Timeout(context) => Self::Timeout(Box::new(context.with_source(source))),
272            Self::RequestAborted(context) => {
273                Self::RequestAborted(Box::new(context.with_source(source)))
274            }
275            Self::Stream(context) => Self::Stream(Box::new(context.with_source(source))),
276            other => other,
277        }
278    }
279
280    /// Returns the stable error code associated with this failure.
281    pub fn code(&self) -> &str {
282        match self {
283            Self::Base(context)
284            | Self::Initialization(context)
285            | Self::UnsupportedRuntime(context)
286            | Self::WebhookVerification(context)
287            | Self::Timeout(context)
288            | Self::RequestAborted(context) => &context.code,
289            Self::InvalidSource(context)
290            | Self::RemoteFetch(context)
291            | Self::RemoteFetchTimeout(context)
292            | Self::RemoteFetchTooLarge(context) => &context.code,
293            Self::Api(context) | Self::Auth(context) | Self::Validation(context) => &context.code,
294            Self::RateLimit(context) => &context.code,
295            Self::InsufficientCredits(context) => &context.code,
296            Self::JobFailed(context) | Self::JobCanceled(context) => &context.code,
297            Self::Stream(context) => &context.code,
298        }
299    }
300
301    /// Returns the API request identifier, when available.
302    pub fn request_id(&self) -> Option<&str> {
303        match self {
304            Self::Base(context)
305            | Self::Initialization(context)
306            | Self::UnsupportedRuntime(context)
307            | Self::WebhookVerification(context)
308            | Self::Timeout(context)
309            | Self::RequestAborted(context) => context.request_id.as_deref(),
310            Self::InvalidSource(context)
311            | Self::RemoteFetch(context)
312            | Self::RemoteFetchTimeout(context)
313            | Self::RemoteFetchTooLarge(context) => context.request_id.as_deref(),
314            Self::Api(context) | Self::Auth(context) | Self::Validation(context) => {
315                context.request_id.as_deref()
316            }
317            Self::RateLimit(context) => context.request_id.as_deref(),
318            Self::InsufficientCredits(context) => context.request_id.as_deref(),
319            Self::JobFailed(context) | Self::JobCanceled(context) => context.request_id.as_deref(),
320            Self::Stream(context) => context.request_id.as_deref(),
321        }
322    }
323
324    /// Returns the HTTP status code for API-derived failures.
325    pub fn status(&self) -> Option<u16> {
326        match self {
327            Self::InvalidSource(context)
328            | Self::RemoteFetch(context)
329            | Self::RemoteFetchTimeout(context)
330            | Self::RemoteFetchTooLarge(context) => context.status,
331            Self::Api(context) | Self::Auth(context) | Self::Validation(context) => {
332                Some(context.status)
333            }
334            Self::RateLimit(context) => Some(context.status),
335            Self::InsufficientCredits(context) => Some(context.status),
336            _ => None,
337        }
338    }
339
340    pub(crate) fn base(message: impl Into<String>, code: impl Into<String>) -> Self {
341        Self::Base(Box::new(ErrorContext::new(message, code)))
342    }
343
344    pub(crate) fn invalid_request(message: impl Into<String>) -> Self {
345        Self::base(message, "invalid_request")
346    }
347
348    pub(crate) fn invalid_response(message: impl Into<String>) -> Self {
349        Self::base(message, "invalid_response")
350    }
351
352    /// Creates an error used when a webhook payload is malformed after signature verification.
353    pub fn invalid_webhook_payload(message: impl Into<String>) -> Self {
354        Self::base(message, "invalid_webhook_payload")
355    }
356
357    pub(crate) fn initialization(message: impl Into<String>, code: impl Into<String>) -> Self {
358        Self::Initialization(Box::new(ErrorContext::new(message, code)))
359    }
360
361    pub(crate) fn webhook(message: impl Into<String>, code: impl Into<String>) -> Self {
362        Self::WebhookVerification(Box::new(ErrorContext::new(message, code)))
363    }
364
365    pub(crate) fn invalid_source(message: impl Into<String>) -> Self {
366        Self::InvalidSource(Box::new(SourceContext::new(message, "invalid_source")))
367    }
368
369    pub(crate) fn source_too_large(message: impl Into<String>) -> Self {
370        Self::InvalidSource(Box::new(SourceContext::new(message, "source_too_large")))
371    }
372
373    pub(crate) fn remote_fetch(
374        message: impl Into<String>,
375        code: impl Into<String>,
376        url: Option<String>,
377        status: Option<u16>,
378    ) -> Self {
379        Self::RemoteFetch(Box::new(
380            SourceContext::new(message, code).with_remote(url, status),
381        ))
382    }
383
384    pub(crate) fn remote_fetch_timeout(url: Option<String>, status: Option<u16>) -> Self {
385        Self::RemoteFetchTimeout(Box::new(
386            SourceContext::new("remote fetch timed out", "remote_fetch_timeout")
387                .with_remote(url, status),
388        ))
389    }
390
391    pub(crate) fn remote_fetch_too_large(url: Option<String>, status: Option<u16>) -> Self {
392        Self::RemoteFetchTooLarge(Box::new(
393            SourceContext::new("source.url exceeds upload size limit", "source_too_large")
394                .with_remote(url, status),
395        ))
396    }
397
398    pub(crate) fn api(
399        status: u16,
400        request_id: Option<String>,
401        message: impl Into<String>,
402        code: impl Into<String>,
403        details: Option<Value>,
404        retry_after: Option<Duration>,
405    ) -> Self {
406        let message = message.into();
407        let code = code.into();
408        match status {
409            401 | 403 => Self::Auth(Box::new(ApiContext::new(
410                status, request_id, message, code, details,
411            ))),
412            402 => {
413                let (required, available) = read_credit_values(details.as_ref());
414                Self::InsufficientCredits(Box::new(CreditsContext {
415                    message,
416                    code,
417                    request_id,
418                    status,
419                    details: details.map(Box::new),
420                    required,
421                    available,
422                }))
423            }
424            422 => Self::Validation(Box::new(ApiContext::new(
425                status, request_id, message, code, details,
426            ))),
427            429 => Self::RateLimit(Box::new(RateLimitContext {
428                message,
429                code,
430                request_id,
431                status,
432                details: details.map(Box::new),
433                retry_after,
434            })),
435            _ => Self::Api(Box::new(ApiContext::new(
436                status, request_id, message, code, details,
437            ))),
438        }
439    }
440
441    pub(crate) fn job_failed(
442        job_id: impl Into<String>,
443        request_id: Option<String>,
444        code: impl Into<String>,
445        message: impl Into<String>,
446    ) -> Self {
447        Self::JobFailed(Box::new(JobContext {
448            message: message.into(),
449            code: code.into(),
450            request_id,
451            job_id: job_id.into(),
452        }))
453    }
454
455    pub(crate) fn job_canceled(job_id: impl Into<String>, request_id: Option<String>) -> Self {
456        let job_id = job_id.into();
457        Self::JobCanceled(Box::new(JobContext {
458            message: format!("job {job_id} canceled"),
459            code: "job_canceled".into(),
460            request_id,
461            job_id,
462        }))
463    }
464
465    pub(crate) fn timeout(message: impl Into<String>, request_id: Option<String>) -> Self {
466        Self::Timeout(Box::new(
467            ErrorContext::new(message, "timeout").with_request_id(request_id),
468        ))
469    }
470
471    pub(crate) fn request_aborted(request_id: Option<String>) -> Self {
472        Self::RequestAborted(Box::new(
473            ErrorContext::new("request aborted by caller", "request_aborted")
474                .with_request_id(request_id),
475        ))
476    }
477
478    pub(crate) fn stream(message: impl Into<String>, job_id: Option<String>) -> Self {
479        Self::Stream(Box::new(StreamContext::new(message, job_id)))
480    }
481}
482
483pub(crate) fn rate_limit_retry_after(error: &ConduitError) -> Option<Duration> {
484    let ConduitError::RateLimit(context) = error else {
485        return None;
486    };
487    context.retry_after
488}
489
490pub(crate) fn is_retryable_api_error(error: &ConduitError) -> bool {
491    let ConduitError::Api(context) = error else {
492        return false;
493    };
494    context.status >= 500
495}
496
497pub(crate) fn is_transport_error(error: &ConduitError) -> bool {
498    let ConduitError::Base(context) = error else {
499        return false;
500    };
501    context.code == "transport_error"
502}
503
504fn read_credit_values(details: Option<&Value>) -> (f64, f64) {
505    let Some(Value::Object(map)) = details else {
506        return (0.0, 0.0);
507    };
508    let required = map
509        .get("required")
510        .and_then(Value::as_f64)
511        .unwrap_or_default();
512    let available = map
513        .get("available")
514        .and_then(Value::as_f64)
515        .unwrap_or_default();
516    (required, available)
517}