Skip to main content

faucet_core/
error.rs

1//! Error types for faucet-stream.
2
3use std::time::Duration;
4use thiserror::Error;
5
6/// All possible errors returned by faucet-stream.
7#[derive(Debug, Error)]
8pub enum FaucetError {
9    #[error("HTTP error: {0}")]
10    Http(#[from] reqwest::Error),
11
12    /// An HTTP response with a non-success status code.
13    ///
14    /// Contains the status code, URL, and (truncated) response body for
15    /// debugging.  Whether this error is retriable depends on the status code
16    /// — see [`FaucetError::is_retriable`].
17    #[error("HTTP {status} from {url}: {body}")]
18    HttpStatus {
19        status: u16,
20        url: String,
21        body: String,
22    },
23
24    #[error("JSON error: {0}")]
25    Json(#[from] serde_json::Error),
26
27    #[error("JSONPath error: {0}")]
28    JsonPath(String),
29
30    #[error("Auth error: {0}")]
31    Auth(String),
32
33    /// The server responded with HTTP 429 Too Many Requests.
34    /// The inner value is the duration to wait before retrying,
35    /// parsed from the `Retry-After` response header (default: 60 s).
36    #[error("Rate limited: retry after {0:?}")]
37    RateLimited(Duration),
38
39    /// A URL could not be constructed or parsed.
40    #[error("URL error: {0}")]
41    Url(String),
42
43    /// A record transform could not be compiled or applied (e.g. invalid regex).
44    #[error("Transform error: {0}")]
45    Transform(String),
46
47    /// A configuration or validation error (e.g. invalid endpoint, missing descriptor).
48    #[error("Config error: {0}")]
49    Config(String),
50
51    /// A source operation failed (e.g. database query error, file read error).
52    #[error("Source error: {0}")]
53    Source(String),
54
55    /// A sink operation failed (e.g. BigQuery insert error).
56    #[error("Sink error: {0}")]
57    Sink(String),
58
59    /// A custom error from a third-party connector.
60    ///
61    /// Use this to wrap your own error types without losing the error chain:
62    /// ```rust
63    /// use faucet_core::FaucetError;
64    /// let err = FaucetError::Custom(Box::new(std::io::Error::new(
65    ///     std::io::ErrorKind::Other,
66    ///     "my connector failed",
67    /// )));
68    /// ```
69    #[error("Connector error: {0}")]
70    Custom(#[from] Box<dyn std::error::Error + Send + Sync>),
71}
72
73impl FaucetError {
74    /// Whether this error is transient and the request should be retried.
75    ///
76    /// Retriable errors:
77    /// - Network / connection errors (`Http` from reqwest)
78    /// - Server errors (5xx status codes)
79    /// - Rate limiting (429 — handled separately with `Retry-After`)
80    ///
81    /// Non-retriable errors:
82    /// - Client errors (4xx except 429)
83    /// - JSON parse / JSONPath / auth / transform errors
84    pub fn is_retriable(&self) -> bool {
85        match self {
86            // reqwest errors: connection timeouts, DNS failures, etc. are retriable.
87            FaucetError::Http(e) => {
88                // If it's a status error that leaked through, check the code.
89                if let Some(status) = e.status() {
90                    status.is_server_error()
91                } else {
92                    // Connection errors, timeouts, etc.
93                    true
94                }
95            }
96            FaucetError::HttpStatus { status, .. } => *status >= 500,
97            FaucetError::RateLimited(_) => true,
98            _ => false,
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn http_status_5xx_is_retriable() {
109        let err = FaucetError::HttpStatus {
110            status: 500,
111            url: "https://example.com".into(),
112            body: "Internal Server Error".into(),
113        };
114        assert!(err.is_retriable());
115
116        let err = FaucetError::HttpStatus {
117            status: 503,
118            url: "https://example.com".into(),
119            body: "".into(),
120        };
121        assert!(err.is_retriable());
122    }
123
124    #[test]
125    fn http_status_4xx_is_not_retriable() {
126        let err = FaucetError::HttpStatus {
127            status: 400,
128            url: "https://example.com".into(),
129            body: "Bad Request".into(),
130        };
131        assert!(!err.is_retriable());
132
133        let err = FaucetError::HttpStatus {
134            status: 404,
135            url: "https://example.com".into(),
136            body: "".into(),
137        };
138        assert!(!err.is_retriable());
139    }
140
141    #[test]
142    fn rate_limited_is_retriable() {
143        let err = FaucetError::RateLimited(Duration::from_secs(30));
144        assert!(err.is_retriable());
145    }
146
147    #[test]
148    fn json_error_is_not_retriable() {
149        let serde_err = serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
150        let err = FaucetError::Json(serde_err);
151        assert!(!err.is_retriable());
152    }
153
154    #[test]
155    fn jsonpath_error_is_not_retriable() {
156        let err = FaucetError::JsonPath("bad path".into());
157        assert!(!err.is_retriable());
158    }
159
160    #[test]
161    fn auth_error_is_not_retriable() {
162        let err = FaucetError::Auth("invalid token".into());
163        assert!(!err.is_retriable());
164    }
165
166    #[test]
167    fn url_error_is_not_retriable() {
168        let err = FaucetError::Url("bad url".into());
169        assert!(!err.is_retriable());
170    }
171
172    #[test]
173    fn transform_error_is_not_retriable() {
174        let err = FaucetError::Transform("bad regex".into());
175        assert!(!err.is_retriable());
176    }
177
178    #[test]
179    fn http_status_display_includes_url_and_body() {
180        let err = FaucetError::HttpStatus {
181            status: 422,
182            url: "https://api.example.com/test".into(),
183            body: "Unprocessable Entity".into(),
184        };
185        let msg = err.to_string();
186        assert!(msg.contains("422"));
187        assert!(msg.contains("https://api.example.com/test"));
188        assert!(msg.contains("Unprocessable Entity"));
189    }
190
191    #[test]
192    fn config_error_is_not_retriable() {
193        let err = FaucetError::Config("bad endpoint".into());
194        assert!(!err.is_retriable());
195    }
196
197    #[test]
198    fn config_error_display() {
199        let err = FaucetError::Config("missing descriptor".into());
200        assert_eq!(err.to_string(), "Config error: missing descriptor");
201    }
202
203    #[test]
204    fn source_error_is_not_retriable() {
205        let err = FaucetError::Source("query failed".into());
206        assert!(!err.is_retriable());
207    }
208
209    #[test]
210    fn source_error_display() {
211        let err = FaucetError::Source("connection refused".into());
212        assert_eq!(err.to_string(), "Source error: connection refused");
213    }
214
215    #[test]
216    fn custom_error_is_not_retriable() {
217        let err = FaucetError::Custom(Box::new(std::io::Error::other("custom failure")));
218        assert!(!err.is_retriable());
219    }
220
221    #[test]
222    fn custom_error_display() {
223        let err = FaucetError::Custom(Box::new(std::io::Error::other("custom failure")));
224        assert_eq!(err.to_string(), "Connector error: custom failure");
225    }
226
227    #[test]
228    fn custom_error_from_boxed() {
229        let io_err = std::io::Error::other("file missing");
230        let boxed: Box<dyn std::error::Error + Send + Sync> = Box::new(io_err);
231        let err: FaucetError = boxed.into();
232        assert!(matches!(err, FaucetError::Custom(_)));
233    }
234
235    #[test]
236    fn sink_error_is_not_retriable() {
237        let err = FaucetError::Sink("BigQuery insert failed".into());
238        assert!(!err.is_retriable());
239    }
240
241    #[test]
242    fn sink_error_display() {
243        let err = FaucetError::Sink("connection refused".into());
244        assert_eq!(err.to_string(), "Sink error: connection refused");
245    }
246}