Skip to main content

faucet_core/
error.rs

1//! Error types for faucet-stream.
2
3use std::time::Duration;
4use thiserror::Error;
5
6/// All possible errors returned by faucet-stream.
7#[derive(Debug, Error)]
8pub enum FaucetError {
9    #[error("HTTP error: {0}")]
10    Http(#[from] reqwest::Error),
11
12    /// An HTTP response with a non-success status code.
13    ///
14    /// Contains the status code, URL, and (truncated) response body for
15    /// debugging.  Whether this error is retriable depends on the status code
16    /// — see [`FaucetError::is_retriable`].
17    #[error("HTTP {status} from {url}: {body}")]
18    HttpStatus {
19        status: u16,
20        url: String,
21        body: String,
22    },
23
24    #[error("JSON error: {0}")]
25    Json(#[from] serde_json::Error),
26
27    #[error("JSONPath error: {0}")]
28    JsonPath(String),
29
30    #[error("Auth error: {0}")]
31    Auth(String),
32
33    /// The server responded with HTTP 429 Too Many Requests.
34    /// The inner value is the duration to wait before retrying,
35    /// parsed from the `Retry-After` response header (default: 60 s).
36    #[error("Rate limited: retry after {0:?}")]
37    RateLimited(Duration),
38
39    /// A URL could not be constructed or parsed.
40    #[error("URL error: {0}")]
41    Url(String),
42
43    /// A record transform could not be compiled or applied (e.g. invalid regex).
44    #[error("Transform error: {0}")]
45    Transform(String),
46
47    /// A configuration or validation error (e.g. invalid endpoint, missing descriptor).
48    #[error("Config error: {0}")]
49    Config(String),
50
51    /// A source operation failed (e.g. database query error, file read error).
52    #[error("Source error: {0}")]
53    Source(String),
54
55    /// A sink operation failed (e.g. BigQuery insert error).
56    #[error("Sink error: {0}")]
57    Sink(String),
58
59    /// A data-quality check failed under an `abort` policy.
60    #[error("Quality check '{check}' failed: {message}")]
61    QualityFailure { check: String, message: String },
62
63    /// A state-store operation failed (read/write/delete of a replication
64    /// bookmark, checkpoint, or other persisted pipeline state).
65    #[error("State error: {0}")]
66    State(String),
67
68    /// A custom error from a third-party connector.
69    ///
70    /// Use this to wrap your own error types without losing the error chain:
71    /// ```rust
72    /// use faucet_core::FaucetError;
73    /// let err = FaucetError::Custom(Box::new(std::io::Error::new(
74    ///     std::io::ErrorKind::Other,
75    ///     "my connector failed",
76    /// )));
77    /// ```
78    #[error("Connector error: {0}")]
79    Custom(#[from] Box<dyn std::error::Error + Send + Sync>),
80}
81
82impl FaucetError {
83    /// Whether this error is transient and the request should be retried.
84    ///
85    /// Retriable errors:
86    /// - Network / connection errors (`Http` from reqwest)
87    /// - Server errors (5xx status codes)
88    /// - Rate limiting (429 — handled separately with `Retry-After`)
89    ///
90    /// Non-retriable errors:
91    /// - Client errors (4xx except 429)
92    /// - JSON parse / JSONPath / auth / transform errors
93    pub fn is_retriable(&self) -> bool {
94        match self {
95            // reqwest errors: connection timeouts, DNS failures, etc. are retriable.
96            FaucetError::Http(e) => {
97                // If it's a status error that leaked through, check the code.
98                if let Some(status) = e.status() {
99                    status.is_server_error()
100                } else {
101                    // Connection errors, timeouts, etc.
102                    true
103                }
104            }
105            // 5xx are retriable; 429 (Too Many Requests) is too — sources that
106            // surface a rate-limit as a plain HttpStatus rather than the
107            // dedicated RateLimited variant (XML, GraphQL) would otherwise abort
108            // on the first 429 (audit #146 H3).
109            FaucetError::HttpStatus { status, .. } => *status >= 500 || *status == 429,
110            FaucetError::RateLimited(_) => true,
111            _ => false,
112        }
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn http_status_5xx_is_retriable() {
122        let err = FaucetError::HttpStatus {
123            status: 500,
124            url: "https://example.com".into(),
125            body: "Internal Server Error".into(),
126        };
127        assert!(err.is_retriable());
128
129        let err = FaucetError::HttpStatus {
130            status: 503,
131            url: "https://example.com".into(),
132            body: "".into(),
133        };
134        assert!(err.is_retriable());
135    }
136
137    #[test]
138    fn http_status_4xx_is_not_retriable() {
139        let err = FaucetError::HttpStatus {
140            status: 400,
141            url: "https://example.com".into(),
142            body: "Bad Request".into(),
143        };
144        assert!(!err.is_retriable());
145
146        let err = FaucetError::HttpStatus {
147            status: 404,
148            url: "https://example.com".into(),
149            body: "".into(),
150        };
151        assert!(!err.is_retriable());
152    }
153
154    #[test]
155    fn http_status_429_is_retriable() {
156        // H3 (audit #146): a 429 surfaced as a plain HttpStatus (XML/GraphQL
157        // sources) must be retriable, not aborted on the first hit.
158        let err = FaucetError::HttpStatus {
159            status: 429,
160            url: "https://example.com".into(),
161            body: "Too Many Requests".into(),
162        };
163        assert!(err.is_retriable());
164    }
165
166    #[test]
167    fn rate_limited_is_retriable() {
168        let err = FaucetError::RateLimited(Duration::from_secs(30));
169        assert!(err.is_retriable());
170    }
171
172    #[test]
173    fn json_error_is_not_retriable() {
174        let serde_err = serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
175        let err = FaucetError::Json(serde_err);
176        assert!(!err.is_retriable());
177    }
178
179    #[test]
180    fn jsonpath_error_is_not_retriable() {
181        let err = FaucetError::JsonPath("bad path".into());
182        assert!(!err.is_retriable());
183    }
184
185    #[test]
186    fn auth_error_is_not_retriable() {
187        let err = FaucetError::Auth("invalid token".into());
188        assert!(!err.is_retriable());
189    }
190
191    #[test]
192    fn url_error_is_not_retriable() {
193        let err = FaucetError::Url("bad url".into());
194        assert!(!err.is_retriable());
195    }
196
197    #[test]
198    fn transform_error_is_not_retriable() {
199        let err = FaucetError::Transform("bad regex".into());
200        assert!(!err.is_retriable());
201    }
202
203    #[test]
204    fn http_status_display_includes_url_and_body() {
205        let err = FaucetError::HttpStatus {
206            status: 422,
207            url: "https://api.example.com/test".into(),
208            body: "Unprocessable Entity".into(),
209        };
210        let msg = err.to_string();
211        assert!(msg.contains("422"));
212        assert!(msg.contains("https://api.example.com/test"));
213        assert!(msg.contains("Unprocessable Entity"));
214    }
215
216    #[test]
217    fn config_error_is_not_retriable() {
218        let err = FaucetError::Config("bad endpoint".into());
219        assert!(!err.is_retriable());
220    }
221
222    #[test]
223    fn config_error_display() {
224        let err = FaucetError::Config("missing descriptor".into());
225        assert_eq!(err.to_string(), "Config error: missing descriptor");
226    }
227
228    #[test]
229    fn source_error_is_not_retriable() {
230        let err = FaucetError::Source("query failed".into());
231        assert!(!err.is_retriable());
232    }
233
234    #[test]
235    fn source_error_display() {
236        let err = FaucetError::Source("connection refused".into());
237        assert_eq!(err.to_string(), "Source error: connection refused");
238    }
239
240    #[test]
241    fn custom_error_is_not_retriable() {
242        let err = FaucetError::Custom(Box::new(std::io::Error::other("custom failure")));
243        assert!(!err.is_retriable());
244    }
245
246    #[test]
247    fn custom_error_display() {
248        let err = FaucetError::Custom(Box::new(std::io::Error::other("custom failure")));
249        assert_eq!(err.to_string(), "Connector error: custom failure");
250    }
251
252    #[test]
253    fn custom_error_from_boxed() {
254        let io_err = std::io::Error::other("file missing");
255        let boxed: Box<dyn std::error::Error + Send + Sync> = Box::new(io_err);
256        let err: FaucetError = boxed.into();
257        assert!(matches!(err, FaucetError::Custom(_)));
258    }
259
260    #[test]
261    fn sink_error_is_not_retriable() {
262        let err = FaucetError::Sink("BigQuery insert failed".into());
263        assert!(!err.is_retriable());
264    }
265
266    #[test]
267    fn sink_error_display() {
268        let err = FaucetError::Sink("connection refused".into());
269        assert_eq!(err.to_string(), "Sink error: connection refused");
270    }
271
272    #[test]
273    fn quality_failure_is_not_retriable_and_displays() {
274        let err = FaucetError::QualityFailure {
275            check: "not_null".into(),
276            message: "field 'user_id' was null".into(),
277        };
278        assert!(!err.is_retriable());
279        let s = err.to_string();
280        assert!(s.contains("not_null"));
281        assert!(s.contains("user_id"));
282    }
283}