Skip to main content

net/
error.rs

1//! Error types for the Net event bus.
2
3use thiserror::Error;
4
5/// Errors that can occur during event ingestion.
6#[derive(Debug, Error)]
7pub enum IngestionError {
8    /// Ring buffer is full and backpressure policy rejected the event.
9    #[error("backpressure: ring buffer full")]
10    Backpressure,
11
12    /// Event was dropped due to sampling/decimation policy.
13    #[error("event dropped due to sampling")]
14    Sampled,
15
16    /// Hashed shard id is not in the routing table (e.g. a concurrent
17    /// scale-down removed it, or the shard is still provisioning).
18    /// Previously collapsed into `Backpressure`, which made callers
19    /// apply the wrong remediation (back-off-and-retry on a routing
20    /// miss is futile until the topology stabilizes). Distinct from
21    /// `Backpressure` so callers can distinguish "buffer full" from
22    /// "no destination".
23    #[error("event has no routable shard")]
24    Unrouted,
25
26    /// The event bus has been shut down.
27    #[error("event bus is shutting down")]
28    ShuttingDown,
29
30    /// Serialization failed. Wraps the underlying `serde_json::Error` so
31    /// callers can read the category, line, and column via `source()`.
32    #[error("serialization error: {0}")]
33    Serialization(#[from] serde_json::Error),
34}
35
36/// Errors that can occur in adapter operations.
37#[derive(Debug, Error)]
38pub enum AdapterError {
39    /// Transient error - operation can be retried.
40    #[error("transient error: {0}")]
41    Transient(String),
42
43    /// Fatal error - adapter is in an unrecoverable state.
44    #[error("fatal error: {0}")]
45    Fatal(String),
46
47    /// Backend cannot accept more data - apply backpressure.
48    #[error("backend backpressure")]
49    Backpressure,
50
51    /// Connection error.
52    #[error("connection error: {0}")]
53    Connection(String),
54
55    /// The adapter has been shut down. Distinct from `Connection`
56    /// so callers (and the bus's retry classifier) can tell a "we
57    /// asked this adapter to stop" reject from a transport failure;
58    /// pre-fix every post-shutdown `on_batch` returned `Connection`,
59    /// which classified as non-retryable and silently dropped the
60    /// batch instead of either re-routing or surfacing the shutdown
61    /// as a distinct state to the caller.
62    #[error("adapter is shut down")]
63    Shutdown,
64
65    /// Serialization/deserialization error. Wraps the underlying
66    /// `serde_json::Error` so callers can read the category, line, and
67    /// column via `source()`.
68    #[error("serialization error: {0}")]
69    Serialization(#[from] serde_json::Error),
70}
71
72impl AdapterError {
73    /// Returns true if this error is retryable.
74    #[inline]
75    pub fn is_retryable(&self) -> bool {
76        matches!(self, Self::Transient(_) | Self::Backpressure)
77    }
78
79    /// Returns true if this error is fatal.
80    #[inline]
81    pub fn is_fatal(&self) -> bool {
82        matches!(self, Self::Fatal(_))
83    }
84
85    /// Returns true if this error means the adapter has been shut
86    /// down. Callers can react to this without scraping the
87    /// `Connection` message string.
88    #[inline]
89    pub fn is_shutdown(&self) -> bool {
90        matches!(self, Self::Shutdown)
91    }
92}
93
94/// Errors that can occur during event consumption/polling.
95#[derive(Debug, Error)]
96pub enum ConsumerError {
97    /// Adapter error during polling.
98    #[error("adapter error: {0}")]
99    Adapter(#[from] AdapterError),
100
101    /// Invalid cursor format.
102    #[error("invalid cursor: {0}")]
103    InvalidCursor(String),
104
105    /// Invalid filter specification.
106    #[error("invalid filter: {0}")]
107    InvalidFilter(String),
108}
109
110/// Result type alias for ingestion operations.
111pub type IngestionResult<T> = Result<T, IngestionError>;
112
113/// Result type alias for adapter operations.
114pub type AdapterResult<T> = Result<T, AdapterError>;
115
116/// Result type alias for consumer operations.
117pub type ConsumerResult<T> = Result<T, ConsumerError>;
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    fn make_serde_error() -> serde_json::Error {
124        serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
125    }
126
127    #[test]
128    fn test_adapter_error_is_retryable() {
129        assert!(AdapterError::Transient("temp".into()).is_retryable());
130        assert!(AdapterError::Backpressure.is_retryable());
131        assert!(!AdapterError::Fatal("dead".into()).is_retryable());
132        assert!(!AdapterError::Connection("refused".into()).is_retryable());
133        assert!(!AdapterError::Shutdown.is_retryable());
134        assert!(!AdapterError::Serialization(make_serde_error()).is_retryable());
135    }
136
137    /// `Shutdown` is its own filterable category — distinct from
138    /// generic `Connection` errors so observability tools can tell
139    /// "sending to a stopped adapter" from "transport failure".
140    #[test]
141    fn test_adapter_error_is_shutdown_only_for_shutdown() {
142        assert!(AdapterError::Shutdown.is_shutdown());
143        assert!(!AdapterError::Connection("refused".into()).is_shutdown());
144        assert!(!AdapterError::Fatal("dead".into()).is_shutdown());
145        assert!(!AdapterError::Transient("temp".into()).is_shutdown());
146        assert!(!AdapterError::Backpressure.is_shutdown());
147    }
148
149    /// Regression: BUG_REPORT.md #18 — `Serialization` previously stored
150    /// the rendered error string and broke the `source()` chain. Wrapping
151    /// `serde_json::Error` directly preserves the category/line/column.
152    #[test]
153    fn test_serialization_error_preserves_source() {
154        let err = AdapterError::Serialization(make_serde_error());
155        // The Display impl still renders the inner error.
156        assert!(err.to_string().contains("serialization error"));
157        // And the source chain points at the original serde_json::Error.
158        let source = std::error::Error::source(&err)
159            .expect("Serialization variant should expose its source");
160        assert!(source.is::<serde_json::Error>());
161    }
162
163    #[test]
164    fn test_adapter_error_is_fatal() {
165        assert!(AdapterError::Fatal("dead".into()).is_fatal());
166        assert!(!AdapterError::Transient("temp".into()).is_fatal());
167        assert!(!AdapterError::Backpressure.is_fatal());
168        assert!(!AdapterError::Connection("refused".into()).is_fatal());
169    }
170
171    #[test]
172    fn test_error_display() {
173        assert_eq!(
174            IngestionError::Backpressure.to_string(),
175            "backpressure: ring buffer full"
176        );
177        assert_eq!(
178            IngestionError::Sampled.to_string(),
179            "event dropped due to sampling"
180        );
181        assert_eq!(
182            IngestionError::Unrouted.to_string(),
183            "event has no routable shard"
184        );
185        assert_eq!(
186            IngestionError::ShuttingDown.to_string(),
187            "event bus is shutting down"
188        );
189        assert_eq!(
190            AdapterError::Transient("timeout".into()).to_string(),
191            "transient error: timeout"
192        );
193        assert_eq!(
194            AdapterError::Fatal("crash".into()).to_string(),
195            "fatal error: crash"
196        );
197        assert_eq!(
198            AdapterError::Backpressure.to_string(),
199            "backend backpressure"
200        );
201    }
202
203    #[test]
204    fn test_connection_error_not_retryable() {
205        // Connection errors cover both transient failures ("send failed") and
206        // permanent ones ("adapter not initialized"). Since we can't distinguish
207        // them at the type level, Connection is conservatively non-retryable.
208        // `bus::dispatch_batch` honors `is_retryable()` and skips the retry
209        // loop when this returns false, so a Connection error drops the
210        // batch immediately rather than burning the retry budget.
211        assert!(!AdapterError::Connection("refused".into()).is_retryable());
212        assert!(!AdapterError::Connection("adapter not initialized".into()).is_retryable());
213    }
214
215    #[test]
216    fn test_consumer_error_from_adapter() {
217        let adapter_err = AdapterError::Connection("refused".into());
218        let consumer_err: ConsumerError = adapter_err.into();
219        assert!(matches!(consumer_err, ConsumerError::Adapter(_)));
220    }
221}