1use thiserror::Error;
4
5#[derive(Debug, Error)]
7pub enum IngestionError {
8 #[error("backpressure: ring buffer full")]
10 Backpressure,
11
12 #[error("event dropped due to sampling")]
14 Sampled,
15
16 #[error("event has no routable shard")]
24 Unrouted,
25
26 #[error("event bus is shutting down")]
28 ShuttingDown,
29
30 #[error("serialization error: {0}")]
33 Serialization(#[from] serde_json::Error),
34}
35
36#[derive(Debug, Error)]
38pub enum AdapterError {
39 #[error("transient error: {0}")]
41 Transient(String),
42
43 #[error("fatal error: {0}")]
45 Fatal(String),
46
47 #[error("backend backpressure")]
49 Backpressure,
50
51 #[error("connection error: {0}")]
53 Connection(String),
54
55 #[error("serialization error: {0}")]
59 Serialization(#[from] serde_json::Error),
60}
61
62impl AdapterError {
63 #[inline]
65 pub fn is_retryable(&self) -> bool {
66 matches!(self, Self::Transient(_) | Self::Backpressure)
67 }
68
69 #[inline]
71 pub fn is_fatal(&self) -> bool {
72 matches!(self, Self::Fatal(_))
73 }
74}
75
76#[derive(Debug, Error)]
78pub enum ConsumerError {
79 #[error("adapter error: {0}")]
81 Adapter(#[from] AdapterError),
82
83 #[error("invalid cursor: {0}")]
85 InvalidCursor(String),
86
87 #[error("invalid filter: {0}")]
89 InvalidFilter(String),
90}
91
92pub type IngestionResult<T> = Result<T, IngestionError>;
94
95pub type AdapterResult<T> = Result<T, AdapterError>;
97
98pub type ConsumerResult<T> = Result<T, ConsumerError>;
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 fn make_serde_error() -> serde_json::Error {
106 serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
107 }
108
109 #[test]
110 fn test_adapter_error_is_retryable() {
111 assert!(AdapterError::Transient("temp".into()).is_retryable());
112 assert!(AdapterError::Backpressure.is_retryable());
113 assert!(!AdapterError::Fatal("dead".into()).is_retryable());
114 assert!(!AdapterError::Connection("refused".into()).is_retryable());
115 assert!(!AdapterError::Serialization(make_serde_error()).is_retryable());
116 }
117
118 #[test]
122 fn test_serialization_error_preserves_source() {
123 let err = AdapterError::Serialization(make_serde_error());
124 assert!(err.to_string().contains("serialization error"));
126 let source = std::error::Error::source(&err)
128 .expect("Serialization variant should expose its source");
129 assert!(source.is::<serde_json::Error>());
130 }
131
132 #[test]
133 fn test_adapter_error_is_fatal() {
134 assert!(AdapterError::Fatal("dead".into()).is_fatal());
135 assert!(!AdapterError::Transient("temp".into()).is_fatal());
136 assert!(!AdapterError::Backpressure.is_fatal());
137 assert!(!AdapterError::Connection("refused".into()).is_fatal());
138 }
139
140 #[test]
141 fn test_error_display() {
142 assert_eq!(
143 IngestionError::Backpressure.to_string(),
144 "backpressure: ring buffer full"
145 );
146 assert_eq!(
147 IngestionError::Sampled.to_string(),
148 "event dropped due to sampling"
149 );
150 assert_eq!(
151 IngestionError::Unrouted.to_string(),
152 "event has no routable shard"
153 );
154 assert_eq!(
155 IngestionError::ShuttingDown.to_string(),
156 "event bus is shutting down"
157 );
158 assert_eq!(
159 AdapterError::Transient("timeout".into()).to_string(),
160 "transient error: timeout"
161 );
162 assert_eq!(
163 AdapterError::Fatal("crash".into()).to_string(),
164 "fatal error: crash"
165 );
166 assert_eq!(
167 AdapterError::Backpressure.to_string(),
168 "backend backpressure"
169 );
170 }
171
172 #[test]
173 fn test_connection_error_not_retryable() {
174 assert!(!AdapterError::Connection("refused".into()).is_retryable());
181 assert!(!AdapterError::Connection("adapter not initialized".into()).is_retryable());
182 }
183
184 #[test]
185 fn test_consumer_error_from_adapter() {
186 let adapter_err = AdapterError::Connection("refused".into());
187 let consumer_err: ConsumerError = adapter_err.into();
188 assert!(matches!(consumer_err, ConsumerError::Adapter(_)));
189 }
190}