kafka_remapper_core/
error.rs

1//! Domain error types for the Kafka partition remapping proxy.
2//!
3//! Uses `thiserror` for ergonomic error definitions with proper context.
4
5use thiserror::Error;
6
7/// Errors related to configuration parsing and validation.
8#[derive(Error, Debug)]
9pub enum ConfigError {
10    /// Virtual partitions must be evenly divisible by physical partitions.
11    #[error("invalid compression ratio: virtual_partitions ({virtual_partitions}) must be divisible by physical_partitions ({physical_partitions})")]
12    InvalidCompressionRatio {
13        virtual_partitions: u32,
14        physical_partitions: u32,
15    },
16
17    /// Offset range must be large enough to prevent overlap.
18    #[error("offset_range must be at least 2^20 (1048576), got {0}")]
19    OffsetRangeTooSmall(u64),
20
21    /// Physical partitions must be at least 1.
22    #[error("physical_partitions must be at least 1, got {0}")]
23    InvalidPhysicalPartitions(u32),
24
25    /// Virtual partitions must be at least equal to physical partitions.
26    #[error("virtual_partitions ({virtual_partitions}) must be >= physical_partitions ({physical_partitions})")]
27    VirtualLessThanPhysical {
28        virtual_partitions: u32,
29        physical_partitions: u32,
30    },
31
32    /// Failed to read configuration file.
33    #[error("failed to read config file '{path}': {source}")]
34    IoError {
35        path: String,
36        #[source]
37        source: std::io::Error,
38    },
39
40    /// Failed to parse YAML configuration.
41    #[error("failed to parse config: {0}")]
42    ParseError(#[from] serde_yaml::Error),
43
44    /// Invalid address format.
45    #[error("invalid address format: {0} (expected 'host:port')")]
46    InvalidAddress(String),
47
48    /// General validation error.
49    #[error("validation error: {0}")]
50    ValidationError(String),
51}
52
53/// Errors that occur during proxy operation.
54#[derive(Error, Debug)]
55pub enum ProxyError {
56    /// TCP/IO connection error.
57    #[error("connection error: {0}")]
58    Connection(#[from] std::io::Error),
59
60    /// Failed to decode Kafka protocol message.
61    #[error("protocol decode error: {message}")]
62    ProtocolDecode { message: String },
63
64    /// Failed to encode Kafka protocol message.
65    #[error("protocol encode error: {message}")]
66    ProtocolEncode { message: String },
67
68    /// Broker is not available or connection failed.
69    #[error("broker {broker_id} unavailable: {message}")]
70    BrokerUnavailable { broker_id: i32, message: String },
71
72    /// No brokers available to handle request.
73    #[error("no brokers available")]
74    NoBrokersAvailable,
75
76    /// Topic was not found in metadata.
77    #[error("topic not found: {topic}")]
78    TopicNotFound { topic: String },
79
80    /// Partition index out of valid range.
81    #[error("partition {partition} out of range for topic {topic} (max: {max_partition})")]
82    PartitionOutOfRange {
83        topic: String,
84        partition: i32,
85        max_partition: i32,
86    },
87
88    /// Offset would overflow the configured range.
89    #[error("offset overflow: virtual offset {offset} exceeds range for partition {partition}")]
90    OffsetOverflow { partition: i32, offset: i64 },
91
92    /// Request correlation ID mismatch.
93    #[error("correlation ID mismatch: expected {expected}, got {actual}")]
94    CorrelationIdMismatch { expected: i32, actual: i32 },
95
96    /// Unsupported API key/version.
97    #[error("unsupported API: key={api_key}, version={api_version}")]
98    UnsupportedApi { api_key: i16, api_version: i16 },
99
100    /// Remapping error.
101    #[error("remapping error: {0}")]
102    Remap(#[from] RemapError),
103
104    /// TLS error.
105    #[error("TLS error: {0}")]
106    Tls(#[from] TlsError),
107
108    /// SASL authentication error.
109    #[error("authentication error: {0}")]
110    Auth(#[from] AuthError),
111
112    /// Shutdown signal received.
113    #[error("proxy shutting down")]
114    Shutdown,
115}
116
117/// Errors related to TLS/SSL operations.
118#[derive(Error, Debug)]
119pub enum TlsError {
120    /// Failed to load certificate file.
121    #[error("failed to load certificate from '{path}': {message}")]
122    CertificateLoad { path: String, message: String },
123
124    /// Failed to load private key file.
125    #[error("failed to load private key from '{path}': {message}")]
126    PrivateKeyLoad { path: String, message: String },
127
128    /// Failed to build TLS configuration.
129    #[error("TLS configuration error: {0}")]
130    Config(String),
131
132    /// TLS handshake failed.
133    #[error("TLS handshake failed: {0}")]
134    Handshake(String),
135
136    /// Invalid certificate.
137    #[error("invalid certificate: {0}")]
138    InvalidCertificate(String),
139
140    /// No certificates found in file.
141    #[error("no certificates found in '{0}'")]
142    NoCertificates(String),
143
144    /// No private keys found in file.
145    #[error("no private keys found in '{0}'")]
146    NoPrivateKeys(String),
147}
148
149/// Errors related to SASL authentication.
150#[derive(Error, Debug)]
151pub enum AuthError {
152    /// SASL mechanism not supported.
153    #[error("unsupported SASL mechanism: {0}")]
154    UnsupportedMechanism(String),
155
156    /// Authentication failed (invalid credentials).
157    #[error("authentication failed: {0}")]
158    AuthenticationFailed(String),
159
160    /// Invalid credentials provided.
161    #[error("invalid credentials")]
162    InvalidCredentials,
163
164    /// SASL handshake failed.
165    #[error("SASL handshake error: {0}")]
166    HandshakeError(String),
167
168    /// Missing required configuration.
169    #[error("missing SASL configuration: {0}")]
170    MissingConfig(String),
171
172    /// Invalid SASL protocol message.
173    #[error("invalid SASL message: {0}")]
174    InvalidMessage(String),
175
176    /// Credential store error.
177    #[error("credential store error: {0}")]
178    CredentialStore(String),
179
180    /// Configuration error (e.g., OAUTHBEARER setup).
181    #[error("authentication configuration error: {0}")]
182    Configuration(String),
183
184    /// OAuth token validation failed.
185    #[error("OAuth token validation failed: {0}")]
186    TokenValidationFailed(String),
187
188    /// OAuth token has expired.
189    #[error("OAuth token expired")]
190    TokenExpired,
191
192    /// OAuth token signature verification failed.
193    #[error("OAuth token signature invalid")]
194    TokenSignatureInvalid,
195
196    /// Required scope missing from OAuth token.
197    #[error("missing required OAuth scope: {0}")]
198    MissingScope(String),
199
200    /// Unexpected error during authentication.
201    #[error("unexpected authentication error: {0}")]
202    Unexpected(String),
203}
204
205/// Errors specific to partition/offset remapping operations.
206#[derive(Error, Debug, Clone, PartialEq, Eq)]
207pub enum RemapError {
208    /// Virtual partition index exceeds configured maximum.
209    #[error("virtual partition {partition} exceeds maximum {max_partition}")]
210    VirtualPartitionOutOfRange { partition: i32, max_partition: u32 },
211
212    /// Negative partition index is invalid.
213    #[error("negative partition index: {0}")]
214    NegativePartition(i32),
215
216    /// Physical offset does not map to any valid virtual partition.
217    #[error("physical offset {offset} does not belong to any virtual partition in physical partition {partition}")]
218    InvalidPhysicalOffset { offset: i64, partition: i32 },
219
220    /// Negative offset is invalid for mapping.
221    #[error("negative offset: {0}")]
222    NegativeOffset(i64),
223}
224
225/// Result type alias for proxy operations.
226pub type Result<T> = std::result::Result<T, ProxyError>;
227
228/// Result type alias for configuration operations.
229pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
230
231/// Result type alias for remapping operations.
232pub type RemapResult<T> = std::result::Result<T, RemapError>;
233
234/// Result type alias for TLS operations.
235pub type TlsResult<T> = std::result::Result<T, TlsError>;
236
237/// Result type alias for authentication operations.
238pub type AuthResult<T> = std::result::Result<T, AuthError>;
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn test_config_error_display() {
246        let err = ConfigError::InvalidCompressionRatio {
247            virtual_partitions: 100,
248            physical_partitions: 30,
249        };
250        assert!(err.to_string().contains("100"));
251        assert!(err.to_string().contains("30"));
252    }
253
254    #[test]
255    fn test_remap_error_display() {
256        let err = RemapError::VirtualPartitionOutOfRange {
257            partition: 150,
258            max_partition: 100,
259        };
260        assert!(err.to_string().contains("150"));
261        assert!(err.to_string().contains("100"));
262    }
263
264    #[test]
265    fn test_proxy_error_from_io() {
266        let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "test");
267        let proxy_err: ProxyError = io_err.into();
268        assert!(matches!(proxy_err, ProxyError::Connection(_)));
269    }
270
271    #[test]
272    fn test_proxy_error_from_remap() {
273        let remap_err = RemapError::NegativePartition(-1);
274        let proxy_err: ProxyError = remap_err.into();
275        assert!(matches!(proxy_err, ProxyError::Remap(_)));
276    }
277
278    #[test]
279    fn test_proxy_error_from_tls() {
280        let tls_err = TlsError::Config("test error".to_string());
281        let proxy_err: ProxyError = tls_err.into();
282        assert!(matches!(proxy_err, ProxyError::Tls(_)));
283    }
284
285    #[test]
286    fn test_proxy_error_from_auth() {
287        let auth_err = AuthError::AuthenticationFailed("invalid credentials".to_string());
288        let proxy_err: ProxyError = auth_err.into();
289        assert!(matches!(proxy_err, ProxyError::Auth(_)));
290    }
291
292    #[test]
293    fn test_tls_error_display() {
294        let err = TlsError::CertificateLoad {
295            path: "/etc/ssl/cert.pem".to_string(),
296            message: "file not found".to_string(),
297        };
298        let msg = err.to_string();
299        assert!(msg.contains("/etc/ssl/cert.pem"));
300        assert!(msg.contains("file not found"));
301    }
302
303    #[test]
304    fn test_auth_error_display() {
305        let err = AuthError::UnsupportedMechanism("GSSAPI".to_string());
306        assert!(err.to_string().contains("GSSAPI"));
307    }
308}