1use thiserror::Error;
6
7#[derive(Error, Debug)]
9pub enum ConfigError {
10 #[error("invalid compression ratio: virtual_partitions ({virtual_partitions}) must be divisible by physical_partitions ({physical_partitions})")]
12 InvalidCompressionRatio {
13 virtual_partitions: u32,
14 physical_partitions: u32,
15 },
16
17 #[error("offset_range must be at least 2^20 (1048576), got {0}")]
19 OffsetRangeTooSmall(u64),
20
21 #[error("physical_partitions must be at least 1, got {0}")]
23 InvalidPhysicalPartitions(u32),
24
25 #[error("virtual_partitions ({virtual_partitions}) must be >= physical_partitions ({physical_partitions})")]
27 VirtualLessThanPhysical {
28 virtual_partitions: u32,
29 physical_partitions: u32,
30 },
31
32 #[error("failed to read config file '{path}': {source}")]
34 IoError {
35 path: String,
36 #[source]
37 source: std::io::Error,
38 },
39
40 #[error("failed to parse config: {0}")]
42 ParseError(#[from] serde_yaml::Error),
43
44 #[error("invalid address format: {0} (expected 'host:port')")]
46 InvalidAddress(String),
47
48 #[error("validation error: {0}")]
50 ValidationError(String),
51}
52
53#[derive(Error, Debug)]
55pub enum ProxyError {
56 #[error("connection error: {0}")]
58 Connection(#[from] std::io::Error),
59
60 #[error("protocol decode error: {message}")]
62 ProtocolDecode { message: String },
63
64 #[error("protocol encode error: {message}")]
66 ProtocolEncode { message: String },
67
68 #[error("broker {broker_id} unavailable: {message}")]
70 BrokerUnavailable { broker_id: i32, message: String },
71
72 #[error("no brokers available")]
74 NoBrokersAvailable,
75
76 #[error("topic not found: {topic}")]
78 TopicNotFound { topic: String },
79
80 #[error("partition {partition} out of range for topic {topic} (max: {max_partition})")]
82 PartitionOutOfRange {
83 topic: String,
84 partition: i32,
85 max_partition: i32,
86 },
87
88 #[error("offset overflow: virtual offset {offset} exceeds range for partition {partition}")]
90 OffsetOverflow { partition: i32, offset: i64 },
91
92 #[error("correlation ID mismatch: expected {expected}, got {actual}")]
94 CorrelationIdMismatch { expected: i32, actual: i32 },
95
96 #[error("unsupported API: key={api_key}, version={api_version}")]
98 UnsupportedApi { api_key: i16, api_version: i16 },
99
100 #[error("remapping error: {0}")]
102 Remap(#[from] RemapError),
103
104 #[error("TLS error: {0}")]
106 Tls(#[from] TlsError),
107
108 #[error("authentication error: {0}")]
110 Auth(#[from] AuthError),
111
112 #[error("proxy shutting down")]
114 Shutdown,
115}
116
117#[derive(Error, Debug)]
119pub enum TlsError {
120 #[error("failed to load certificate from '{path}': {message}")]
122 CertificateLoad { path: String, message: String },
123
124 #[error("failed to load private key from '{path}': {message}")]
126 PrivateKeyLoad { path: String, message: String },
127
128 #[error("TLS configuration error: {0}")]
130 Config(String),
131
132 #[error("TLS handshake failed: {0}")]
134 Handshake(String),
135
136 #[error("invalid certificate: {0}")]
138 InvalidCertificate(String),
139
140 #[error("no certificates found in '{0}'")]
142 NoCertificates(String),
143
144 #[error("no private keys found in '{0}'")]
146 NoPrivateKeys(String),
147}
148
149#[derive(Error, Debug)]
151pub enum AuthError {
152 #[error("unsupported SASL mechanism: {0}")]
154 UnsupportedMechanism(String),
155
156 #[error("authentication failed: {0}")]
158 AuthenticationFailed(String),
159
160 #[error("invalid credentials")]
162 InvalidCredentials,
163
164 #[error("SASL handshake error: {0}")]
166 HandshakeError(String),
167
168 #[error("missing SASL configuration: {0}")]
170 MissingConfig(String),
171
172 #[error("invalid SASL message: {0}")]
174 InvalidMessage(String),
175
176 #[error("credential store error: {0}")]
178 CredentialStore(String),
179
180 #[error("authentication configuration error: {0}")]
182 Configuration(String),
183
184 #[error("OAuth token validation failed: {0}")]
186 TokenValidationFailed(String),
187
188 #[error("OAuth token expired")]
190 TokenExpired,
191
192 #[error("OAuth token signature invalid")]
194 TokenSignatureInvalid,
195
196 #[error("missing required OAuth scope: {0}")]
198 MissingScope(String),
199
200 #[error("unexpected authentication error: {0}")]
202 Unexpected(String),
203}
204
205#[derive(Error, Debug, Clone, PartialEq, Eq)]
207pub enum RemapError {
208 #[error("virtual partition {partition} exceeds maximum {max_partition}")]
210 VirtualPartitionOutOfRange { partition: i32, max_partition: u32 },
211
212 #[error("negative partition index: {0}")]
214 NegativePartition(i32),
215
216 #[error("physical offset {offset} does not belong to any virtual partition in physical partition {partition}")]
218 InvalidPhysicalOffset { offset: i64, partition: i32 },
219
220 #[error("negative offset: {0}")]
222 NegativeOffset(i64),
223}
224
225pub type Result<T> = std::result::Result<T, ProxyError>;
227
228pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
230
231pub type RemapResult<T> = std::result::Result<T, RemapError>;
233
234pub type TlsResult<T> = std::result::Result<T, TlsError>;
236
237pub type AuthResult<T> = std::result::Result<T, AuthError>;
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn test_config_error_display() {
246 let err = ConfigError::InvalidCompressionRatio {
247 virtual_partitions: 100,
248 physical_partitions: 30,
249 };
250 assert!(err.to_string().contains("100"));
251 assert!(err.to_string().contains("30"));
252 }
253
254 #[test]
255 fn test_remap_error_display() {
256 let err = RemapError::VirtualPartitionOutOfRange {
257 partition: 150,
258 max_partition: 100,
259 };
260 assert!(err.to_string().contains("150"));
261 assert!(err.to_string().contains("100"));
262 }
263
264 #[test]
265 fn test_proxy_error_from_io() {
266 let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "test");
267 let proxy_err: ProxyError = io_err.into();
268 assert!(matches!(proxy_err, ProxyError::Connection(_)));
269 }
270
271 #[test]
272 fn test_proxy_error_from_remap() {
273 let remap_err = RemapError::NegativePartition(-1);
274 let proxy_err: ProxyError = remap_err.into();
275 assert!(matches!(proxy_err, ProxyError::Remap(_)));
276 }
277
278 #[test]
279 fn test_proxy_error_from_tls() {
280 let tls_err = TlsError::Config("test error".to_string());
281 let proxy_err: ProxyError = tls_err.into();
282 assert!(matches!(proxy_err, ProxyError::Tls(_)));
283 }
284
285 #[test]
286 fn test_proxy_error_from_auth() {
287 let auth_err = AuthError::AuthenticationFailed("invalid credentials".to_string());
288 let proxy_err: ProxyError = auth_err.into();
289 assert!(matches!(proxy_err, ProxyError::Auth(_)));
290 }
291
292 #[test]
293 fn test_tls_error_display() {
294 let err = TlsError::CertificateLoad {
295 path: "/etc/ssl/cert.pem".to_string(),
296 message: "file not found".to_string(),
297 };
298 let msg = err.to_string();
299 assert!(msg.contains("/etc/ssl/cert.pem"));
300 assert!(msg.contains("file not found"));
301 }
302
303 #[test]
304 fn test_auth_error_display() {
305 let err = AuthError::UnsupportedMechanism("GSSAPI".to_string());
306 assert!(err.to_string().contains("GSSAPI"));
307 }
308}