Skip to main content

net/
error.rs

1//! Error types for the Net event bus.
2
3use thiserror::Error;
4
5/// Errors that can occur during event ingestion.
6#[derive(Debug, Error)]
7pub enum IngestionError {
8    /// Ring buffer is full and backpressure policy rejected the event.
9    #[error("backpressure: ring buffer full")]
10    Backpressure,
11
12    /// Event was dropped due to sampling/decimation policy.
13    #[error("event dropped due to sampling")]
14    Sampled,
15
16    /// Hashed shard id is not in the routing table (e.g. a concurrent
17    /// scale-down removed it, or the shard is still provisioning).
18    /// Previously collapsed into `Backpressure`, which made callers
19    /// apply the wrong remediation (back-off-and-retry on a routing
20    /// miss is futile until the topology stabilizes). Distinct from
21    /// `Backpressure` so callers can distinguish "buffer full" from
22    /// "no destination".
23    #[error("event has no routable shard")]
24    Unrouted,
25
26    /// The event bus has been shut down.
27    #[error("event bus is shutting down")]
28    ShuttingDown,
29
30    /// Serialization failed. Wraps the underlying `serde_json::Error` so
31    /// callers can read the category, line, and column via `source()`.
32    #[error("serialization error: {0}")]
33    Serialization(#[from] serde_json::Error),
34}
35
36/// Errors that can occur in adapter operations.
37#[derive(Debug, Error)]
38pub enum AdapterError {
39    /// Transient error - operation can be retried.
40    #[error("transient error: {0}")]
41    Transient(String),
42
43    /// Fatal error - adapter is in an unrecoverable state.
44    #[error("fatal error: {0}")]
45    Fatal(String),
46
47    /// Backend cannot accept more data - apply backpressure.
48    #[error("backend backpressure")]
49    Backpressure,
50
51    /// Connection error.
52    #[error("connection error: {0}")]
53    Connection(String),
54
55    /// Serialization/deserialization error. Wraps the underlying
56    /// `serde_json::Error` so callers can read the category, line, and
57    /// column via `source()`.
58    #[error("serialization error: {0}")]
59    Serialization(#[from] serde_json::Error),
60}
61
62impl AdapterError {
63    /// Returns true if this error is retryable.
64    #[inline]
65    pub fn is_retryable(&self) -> bool {
66        matches!(self, Self::Transient(_) | Self::Backpressure)
67    }
68
69    /// Returns true if this error is fatal.
70    #[inline]
71    pub fn is_fatal(&self) -> bool {
72        matches!(self, Self::Fatal(_))
73    }
74}
75
76/// Errors that can occur during event consumption/polling.
77#[derive(Debug, Error)]
78pub enum ConsumerError {
79    /// Adapter error during polling.
80    #[error("adapter error: {0}")]
81    Adapter(#[from] AdapterError),
82
83    /// Invalid cursor format.
84    #[error("invalid cursor: {0}")]
85    InvalidCursor(String),
86
87    /// Invalid filter specification.
88    #[error("invalid filter: {0}")]
89    InvalidFilter(String),
90}
91
92/// Result type alias for ingestion operations.
93pub type IngestionResult<T> = Result<T, IngestionError>;
94
95/// Result type alias for adapter operations.
96pub type AdapterResult<T> = Result<T, AdapterError>;
97
98/// Result type alias for consumer operations.
99pub type ConsumerResult<T> = Result<T, ConsumerError>;
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    fn make_serde_error() -> serde_json::Error {
106        serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
107    }
108
109    #[test]
110    fn test_adapter_error_is_retryable() {
111        assert!(AdapterError::Transient("temp".into()).is_retryable());
112        assert!(AdapterError::Backpressure.is_retryable());
113        assert!(!AdapterError::Fatal("dead".into()).is_retryable());
114        assert!(!AdapterError::Connection("refused".into()).is_retryable());
115        assert!(!AdapterError::Serialization(make_serde_error()).is_retryable());
116    }
117
118    /// Regression: BUG_REPORT.md #18 — `Serialization` previously stored
119    /// the rendered error string and broke the `source()` chain. Wrapping
120    /// `serde_json::Error` directly preserves the category/line/column.
121    #[test]
122    fn test_serialization_error_preserves_source() {
123        let err = AdapterError::Serialization(make_serde_error());
124        // The Display impl still renders the inner error.
125        assert!(err.to_string().contains("serialization error"));
126        // And the source chain points at the original serde_json::Error.
127        let source = std::error::Error::source(&err)
128            .expect("Serialization variant should expose its source");
129        assert!(source.is::<serde_json::Error>());
130    }
131
132    #[test]
133    fn test_adapter_error_is_fatal() {
134        assert!(AdapterError::Fatal("dead".into()).is_fatal());
135        assert!(!AdapterError::Transient("temp".into()).is_fatal());
136        assert!(!AdapterError::Backpressure.is_fatal());
137        assert!(!AdapterError::Connection("refused".into()).is_fatal());
138    }
139
140    #[test]
141    fn test_error_display() {
142        assert_eq!(
143            IngestionError::Backpressure.to_string(),
144            "backpressure: ring buffer full"
145        );
146        assert_eq!(
147            IngestionError::Sampled.to_string(),
148            "event dropped due to sampling"
149        );
150        assert_eq!(
151            IngestionError::Unrouted.to_string(),
152            "event has no routable shard"
153        );
154        assert_eq!(
155            IngestionError::ShuttingDown.to_string(),
156            "event bus is shutting down"
157        );
158        assert_eq!(
159            AdapterError::Transient("timeout".into()).to_string(),
160            "transient error: timeout"
161        );
162        assert_eq!(
163            AdapterError::Fatal("crash".into()).to_string(),
164            "fatal error: crash"
165        );
166        assert_eq!(
167            AdapterError::Backpressure.to_string(),
168            "backend backpressure"
169        );
170    }
171
172    #[test]
173    fn test_connection_error_not_retryable() {
174        // Connection errors cover both transient failures ("send failed") and
175        // permanent ones ("adapter not initialized"). Since we can't distinguish
176        // them at the type level, Connection is conservatively non-retryable.
177        // `bus::dispatch_batch` honors `is_retryable()` and skips the retry
178        // loop when this returns false, so a Connection error drops the
179        // batch immediately rather than burning the retry budget.
180        assert!(!AdapterError::Connection("refused".into()).is_retryable());
181        assert!(!AdapterError::Connection("adapter not initialized".into()).is_retryable());
182    }
183
184    #[test]
185    fn test_consumer_error_from_adapter() {
186        let adapter_err = AdapterError::Connection("refused".into());
187        let consumer_err: ConsumerError = adapter_err.into();
188        assert!(matches!(consumer_err, ConsumerError::Adapter(_)));
189    }
190}