Skip to main content

oxigdal_pubsub/
error.rs

1//! Error types for OxiGDAL Pub/Sub operations.
2//!
3//! This module provides comprehensive error handling for Google Cloud Pub/Sub
4//! operations including publishing, subscribing, schema validation, and monitoring.
5
6#[cfg(feature = "schema")]
7use std::fmt;
8
9/// Result type for Pub/Sub operations.
10pub type Result<T> = std::result::Result<T, PubSubError>;
11
12/// Errors that can occur during Pub/Sub operations.
13#[derive(Debug, thiserror::Error)]
14pub enum PubSubError {
15    /// Error during message publishing.
16    #[error("Publishing error: {message}")]
17    PublishError {
18        /// Error message
19        message: String,
20        /// Optional source error
21        #[source]
22        source: Option<Box<dyn std::error::Error + Send + Sync>>,
23    },
24
25    /// Error during message subscription.
26    #[error("Subscription error: {message}")]
27    SubscriptionError {
28        /// Error message
29        message: String,
30        /// Optional source error
31        #[source]
32        source: Option<Box<dyn std::error::Error + Send + Sync>>,
33    },
34
35    /// Error during message acknowledgment.
36    #[error("Acknowledgment error: {message}")]
37    AcknowledgmentError {
38        /// Error message
39        message: String,
40        /// Optional source error
41        #[source]
42        source: Option<Box<dyn std::error::Error + Send + Sync>>,
43    },
44
45    /// Schema validation error.
46    #[cfg(feature = "schema")]
47    #[error("Schema validation error: {message}")]
48    SchemaValidationError {
49        /// Error message
50        message: String,
51        /// Schema ID that failed validation
52        schema_id: Option<String>,
53    },
54
55    /// Schema encoding error.
56    #[cfg(feature = "schema")]
57    #[error("Schema encoding error: {message}")]
58    SchemaEncodingError {
59        /// Error message
60        message: String,
61        /// Schema format
62        format: SchemaFormat,
63    },
64
65    /// Schema decoding error.
66    #[cfg(feature = "schema")]
67    #[error("Schema decoding error: {message}")]
68    SchemaDecodingError {
69        /// Error message
70        message: String,
71        /// Schema format
72        format: SchemaFormat,
73    },
74
75    /// Batching error.
76    #[error("Batching error: {message}")]
77    BatchingError {
78        /// Error message
79        message: String,
80        /// Number of messages in failed batch
81        batch_size: usize,
82    },
83
84    /// Flow control error.
85    #[error("Flow control error: {message}")]
86    FlowControlError {
87        /// Error message
88        message: String,
89        /// Current message count
90        current_count: usize,
91        /// Maximum allowed count
92        max_count: usize,
93    },
94
95    /// Dead letter queue error.
96    #[error("Dead letter queue error: {message}")]
97    DeadLetterQueueError {
98        /// Error message
99        message: String,
100        /// Message ID that failed
101        message_id: String,
102    },
103
104    /// Ordering key error.
105    #[error("Ordering key error: {message}")]
106    OrderingKeyError {
107        /// Error message
108        message: String,
109        /// Ordering key that caused the error
110        ordering_key: String,
111    },
112
113    /// Monitoring error.
114    #[cfg(feature = "monitoring")]
115    #[error("Monitoring error: {message}")]
116    MonitoringError {
117        /// Error message
118        message: String,
119        /// Optional source error
120        #[source]
121        source: Option<Box<dyn std::error::Error + Send + Sync>>,
122    },
123
124    /// Authentication error.
125    #[error("Authentication error: {message}")]
126    AuthenticationError {
127        /// Error message
128        message: String,
129        /// Optional source error
130        #[source]
131        source: Option<Box<dyn std::error::Error + Send + Sync>>,
132    },
133
134    /// Configuration error.
135    #[error("Configuration error: {message}")]
136    ConfigurationError {
137        /// Error message
138        message: String,
139        /// Parameter name
140        parameter: String,
141    },
142
143    /// Topic not found error.
144    #[error("Topic not found: {topic_name}")]
145    TopicNotFound {
146        /// Topic name
147        topic_name: String,
148    },
149
150    /// Subscription not found error.
151    #[error("Subscription not found: {subscription_name}")]
152    SubscriptionNotFound {
153        /// Subscription name
154        subscription_name: String,
155    },
156
157    /// Message too large error.
158    #[error("Message too large: {size} bytes (max: {max_size} bytes)")]
159    MessageTooLarge {
160        /// Actual message size
161        size: usize,
162        /// Maximum allowed size
163        max_size: usize,
164    },
165
166    /// Invalid message format.
167    #[error("Invalid message format: {message}")]
168    InvalidMessageFormat {
169        /// Error message
170        message: String,
171    },
172
173    /// Timeout error.
174    #[error("Operation timed out after {duration_ms}ms")]
175    Timeout {
176        /// Timeout duration in milliseconds
177        duration_ms: u64,
178    },
179
180    /// Network error.
181    #[error("Network error: {message}")]
182    NetworkError {
183        /// Error message
184        message: String,
185        /// Optional source error
186        #[source]
187        source: Option<Box<dyn std::error::Error + Send + Sync>>,
188    },
189
190    /// Resource exhausted error.
191    #[error("Resource exhausted: {resource}")]
192    ResourceExhausted {
193        /// Resource name
194        resource: String,
195        /// Optional retry after duration in seconds
196        retry_after: Option<u64>,
197    },
198
199    /// Permission denied error.
200    #[error("Permission denied: {operation}")]
201    PermissionDenied {
202        /// Operation that was denied
203        operation: String,
204    },
205
206    /// Internal error.
207    #[error("Internal error: {message}")]
208    InternalError {
209        /// Error message
210        message: String,
211    },
212
213    /// I/O error.
214    #[error("I/O error: {0}")]
215    Io(#[from] std::io::Error),
216
217    /// JSON serialization/deserialization error.
218    #[error("JSON error: {0}")]
219    Json(#[from] serde_json::Error),
220
221    /// Google Cloud Pub/Sub client error.
222    #[error("Pub/Sub client error: {0}")]
223    ClientError(String),
224}
225
226/// Schema format types.
227#[cfg(feature = "schema")]
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum SchemaFormat {
230    /// Apache Avro format
231    Avro,
232    /// Protocol Buffers format
233    Protobuf,
234}
235
236#[cfg(feature = "schema")]
237impl fmt::Display for SchemaFormat {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            SchemaFormat::Avro => write!(f, "Avro"),
241            SchemaFormat::Protobuf => write!(f, "Protobuf"),
242        }
243    }
244}
245
246impl PubSubError {
247    /// Creates a publish error from a message.
248    pub fn publish<S: Into<String>>(message: S) -> Self {
249        Self::PublishError {
250            message: message.into(),
251            source: None,
252        }
253    }
254
255    /// Creates a publish error with a source error.
256    pub fn publish_with_source<S: Into<String>>(
257        message: S,
258        source: Box<dyn std::error::Error + Send + Sync>,
259    ) -> Self {
260        Self::PublishError {
261            message: message.into(),
262            source: Some(source),
263        }
264    }
265
266    /// Creates a subscription error from a message.
267    pub fn subscription<S: Into<String>>(message: S) -> Self {
268        Self::SubscriptionError {
269            message: message.into(),
270            source: None,
271        }
272    }
273
274    /// Creates a subscription error with a source error.
275    pub fn subscription_with_source<S: Into<String>>(
276        message: S,
277        source: Box<dyn std::error::Error + Send + Sync>,
278    ) -> Self {
279        Self::SubscriptionError {
280            message: message.into(),
281            source: Some(source),
282        }
283    }
284
285    /// Creates an acknowledgment error from a message.
286    pub fn acknowledgment<S: Into<String>>(message: S) -> Self {
287        Self::AcknowledgmentError {
288            message: message.into(),
289            source: None,
290        }
291    }
292
293    /// Creates a configuration error.
294    pub fn configuration<S: Into<String>, P: Into<String>>(message: S, parameter: P) -> Self {
295        Self::ConfigurationError {
296            message: message.into(),
297            parameter: parameter.into(),
298        }
299    }
300
301    /// Creates a batching error.
302    pub fn batching<S: Into<String>>(message: S, batch_size: usize) -> Self {
303        Self::BatchingError {
304            message: message.into(),
305            batch_size,
306        }
307    }
308
309    /// Creates a flow control error.
310    pub fn flow_control<S: Into<String>>(
311        message: S,
312        current_count: usize,
313        max_count: usize,
314    ) -> Self {
315        Self::FlowControlError {
316            message: message.into(),
317            current_count,
318            max_count,
319        }
320    }
321
322    /// Creates a dead letter queue error.
323    pub fn dead_letter<S: Into<String>, M: Into<String>>(message: S, message_id: M) -> Self {
324        Self::DeadLetterQueueError {
325            message: message.into(),
326            message_id: message_id.into(),
327        }
328    }
329
330    /// Creates an ordering key error.
331    pub fn ordering_key<S: Into<String>, K: Into<String>>(message: S, ordering_key: K) -> Self {
332        Self::OrderingKeyError {
333            message: message.into(),
334            ordering_key: ordering_key.into(),
335        }
336    }
337
338    /// Creates a topic not found error.
339    pub fn topic_not_found<S: Into<String>>(topic_name: S) -> Self {
340        Self::TopicNotFound {
341            topic_name: topic_name.into(),
342        }
343    }
344
345    /// Creates a subscription not found error.
346    pub fn subscription_not_found<S: Into<String>>(subscription_name: S) -> Self {
347        Self::SubscriptionNotFound {
348            subscription_name: subscription_name.into(),
349        }
350    }
351
352    /// Creates a message too large error.
353    pub fn message_too_large(size: usize, max_size: usize) -> Self {
354        Self::MessageTooLarge { size, max_size }
355    }
356
357    /// Creates a timeout error.
358    pub fn timeout(duration_ms: u64) -> Self {
359        Self::Timeout { duration_ms }
360    }
361
362    /// Creates a resource exhausted error.
363    pub fn resource_exhausted<S: Into<String>>(resource: S, retry_after: Option<u64>) -> Self {
364        Self::ResourceExhausted {
365            resource: resource.into(),
366            retry_after,
367        }
368    }
369
370    /// Creates a permission denied error.
371    pub fn permission_denied<S: Into<String>>(operation: S) -> Self {
372        Self::PermissionDenied {
373            operation: operation.into(),
374        }
375    }
376
377    /// Checks if the error is retryable.
378    pub fn is_retryable(&self) -> bool {
379        matches!(
380            self,
381            Self::NetworkError { .. }
382                | Self::ResourceExhausted { .. }
383                | Self::Timeout { .. }
384                | Self::InternalError { .. }
385        )
386    }
387
388    /// Gets the retry after duration if available.
389    pub fn retry_after(&self) -> Option<u64> {
390        match self {
391            Self::ResourceExhausted { retry_after, .. } => *retry_after,
392            _ => None,
393        }
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    #[test]
402    fn test_publish_error_creation() {
403        let error = PubSubError::publish("test error");
404        assert!(matches!(error, PubSubError::PublishError { .. }));
405        assert!(error.to_string().contains("test error"));
406    }
407
408    #[test]
409    fn test_subscription_error_creation() {
410        let error = PubSubError::subscription("test error");
411        assert!(matches!(error, PubSubError::SubscriptionError { .. }));
412        assert!(error.to_string().contains("test error"));
413    }
414
415    #[test]
416    fn test_configuration_error() {
417        let error = PubSubError::configuration("invalid value", "timeout");
418        assert!(matches!(error, PubSubError::ConfigurationError { .. }));
419        assert!(error.to_string().contains("invalid value"));
420    }
421
422    #[test]
423    fn test_message_too_large_error() {
424        let error = PubSubError::message_too_large(11000000, 10000000);
425        assert!(matches!(error, PubSubError::MessageTooLarge { .. }));
426        assert!(error.to_string().contains("11000000"));
427        assert!(error.to_string().contains("10000000"));
428    }
429
430    #[test]
431    fn test_retryable_errors() {
432        let network_error = PubSubError::NetworkError {
433            message: "connection reset".to_string(),
434            source: None,
435        };
436        assert!(network_error.is_retryable());
437
438        let timeout_error = PubSubError::timeout(5000);
439        assert!(timeout_error.is_retryable());
440
441        let config_error = PubSubError::configuration("bad value", "param");
442        assert!(!config_error.is_retryable());
443    }
444
445    #[test]
446    fn test_retry_after() {
447        let error = PubSubError::resource_exhausted("quota", Some(60));
448        assert_eq!(error.retry_after(), Some(60));
449
450        let error = PubSubError::timeout(1000);
451        assert_eq!(error.retry_after(), None);
452    }
453
454    #[test]
455    fn test_topic_not_found() {
456        let error = PubSubError::topic_not_found("my-topic");
457        assert!(matches!(error, PubSubError::TopicNotFound { .. }));
458        assert!(error.to_string().contains("my-topic"));
459    }
460
461    #[test]
462    fn test_flow_control_error() {
463        let error = PubSubError::flow_control("limit exceeded", 1000, 500);
464        assert!(matches!(error, PubSubError::FlowControlError { .. }));
465        assert!(error.to_string().contains("limit exceeded"));
466    }
467}