1use thiserror::Error;
4
5#[derive(Debug, Error)]
7pub enum IngestionError {
8 #[error("backpressure: ring buffer full")]
10 Backpressure,
11
12 #[error("event dropped due to sampling")]
14 Sampled,
15
16 #[error("event has no routable shard")]
24 Unrouted,
25
26 #[error("event bus is shutting down")]
28 ShuttingDown,
29
30 #[error("serialization error: {0}")]
33 Serialization(#[from] serde_json::Error),
34}
35
36#[derive(Debug, Error)]
38pub enum AdapterError {
39 #[error("transient error: {0}")]
41 Transient(String),
42
43 #[error("fatal error: {0}")]
45 Fatal(String),
46
47 #[error("backend backpressure")]
49 Backpressure,
50
51 #[error("connection error: {0}")]
53 Connection(String),
54
55 #[error("adapter is shut down")]
63 Shutdown,
64
65 #[error("serialization error: {0}")]
69 Serialization(#[from] serde_json::Error),
70}
71
72impl AdapterError {
73 #[inline]
75 pub fn is_retryable(&self) -> bool {
76 matches!(self, Self::Transient(_) | Self::Backpressure)
77 }
78
79 #[inline]
81 pub fn is_fatal(&self) -> bool {
82 matches!(self, Self::Fatal(_))
83 }
84
85 #[inline]
89 pub fn is_shutdown(&self) -> bool {
90 matches!(self, Self::Shutdown)
91 }
92}
93
94#[derive(Debug, Error)]
96pub enum ConsumerError {
97 #[error("adapter error: {0}")]
99 Adapter(#[from] AdapterError),
100
101 #[error("invalid cursor: {0}")]
103 InvalidCursor(String),
104
105 #[error("invalid filter: {0}")]
107 InvalidFilter(String),
108}
109
110pub type IngestionResult<T> = Result<T, IngestionError>;
112
113pub type AdapterResult<T> = Result<T, AdapterError>;
115
116pub type ConsumerResult<T> = Result<T, ConsumerError>;
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 fn make_serde_error() -> serde_json::Error {
124 serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
125 }
126
127 #[test]
128 fn test_adapter_error_is_retryable() {
129 assert!(AdapterError::Transient("temp".into()).is_retryable());
130 assert!(AdapterError::Backpressure.is_retryable());
131 assert!(!AdapterError::Fatal("dead".into()).is_retryable());
132 assert!(!AdapterError::Connection("refused".into()).is_retryable());
133 assert!(!AdapterError::Shutdown.is_retryable());
134 assert!(!AdapterError::Serialization(make_serde_error()).is_retryable());
135 }
136
137 #[test]
141 fn test_adapter_error_is_shutdown_only_for_shutdown() {
142 assert!(AdapterError::Shutdown.is_shutdown());
143 assert!(!AdapterError::Connection("refused".into()).is_shutdown());
144 assert!(!AdapterError::Fatal("dead".into()).is_shutdown());
145 assert!(!AdapterError::Transient("temp".into()).is_shutdown());
146 assert!(!AdapterError::Backpressure.is_shutdown());
147 }
148
149 #[test]
153 fn test_serialization_error_preserves_source() {
154 let err = AdapterError::Serialization(make_serde_error());
155 assert!(err.to_string().contains("serialization error"));
157 let source = std::error::Error::source(&err)
159 .expect("Serialization variant should expose its source");
160 assert!(source.is::<serde_json::Error>());
161 }
162
163 #[test]
164 fn test_adapter_error_is_fatal() {
165 assert!(AdapterError::Fatal("dead".into()).is_fatal());
166 assert!(!AdapterError::Transient("temp".into()).is_fatal());
167 assert!(!AdapterError::Backpressure.is_fatal());
168 assert!(!AdapterError::Connection("refused".into()).is_fatal());
169 }
170
171 #[test]
172 fn test_error_display() {
173 assert_eq!(
174 IngestionError::Backpressure.to_string(),
175 "backpressure: ring buffer full"
176 );
177 assert_eq!(
178 IngestionError::Sampled.to_string(),
179 "event dropped due to sampling"
180 );
181 assert_eq!(
182 IngestionError::Unrouted.to_string(),
183 "event has no routable shard"
184 );
185 assert_eq!(
186 IngestionError::ShuttingDown.to_string(),
187 "event bus is shutting down"
188 );
189 assert_eq!(
190 AdapterError::Transient("timeout".into()).to_string(),
191 "transient error: timeout"
192 );
193 assert_eq!(
194 AdapterError::Fatal("crash".into()).to_string(),
195 "fatal error: crash"
196 );
197 assert_eq!(
198 AdapterError::Backpressure.to_string(),
199 "backend backpressure"
200 );
201 }
202
203 #[test]
204 fn test_connection_error_not_retryable() {
205 assert!(!AdapterError::Connection("refused".into()).is_retryable());
212 assert!(!AdapterError::Connection("adapter not initialized".into()).is_retryable());
213 }
214
215 #[test]
216 fn test_consumer_error_from_adapter() {
217 let adapter_err = AdapterError::Connection("refused".into());
218 let consumer_err: ConsumerError = adapter_err.into();
219 assert!(matches!(consumer_err, ConsumerError::Adapter(_)));
220 }
221}