1#[cfg(feature = "schema")]
7use std::fmt;
8
9pub type Result<T> = std::result::Result<T, PubSubError>;
11
12#[derive(Debug, thiserror::Error)]
14pub enum PubSubError {
15 #[error("Publishing error: {message}")]
17 PublishError {
18 message: String,
20 #[source]
22 source: Option<Box<dyn std::error::Error + Send + Sync>>,
23 },
24
25 #[error("Subscription error: {message}")]
27 SubscriptionError {
28 message: String,
30 #[source]
32 source: Option<Box<dyn std::error::Error + Send + Sync>>,
33 },
34
35 #[error("Acknowledgment error: {message}")]
37 AcknowledgmentError {
38 message: String,
40 #[source]
42 source: Option<Box<dyn std::error::Error + Send + Sync>>,
43 },
44
45 #[cfg(feature = "schema")]
47 #[error("Schema validation error: {message}")]
48 SchemaValidationError {
49 message: String,
51 schema_id: Option<String>,
53 },
54
55 #[cfg(feature = "schema")]
57 #[error("Schema encoding error: {message}")]
58 SchemaEncodingError {
59 message: String,
61 format: SchemaFormat,
63 },
64
65 #[cfg(feature = "schema")]
67 #[error("Schema decoding error: {message}")]
68 SchemaDecodingError {
69 message: String,
71 format: SchemaFormat,
73 },
74
75 #[error("Batching error: {message}")]
77 BatchingError {
78 message: String,
80 batch_size: usize,
82 },
83
84 #[error("Flow control error: {message}")]
86 FlowControlError {
87 message: String,
89 current_count: usize,
91 max_count: usize,
93 },
94
95 #[error("Dead letter queue error: {message}")]
97 DeadLetterQueueError {
98 message: String,
100 message_id: String,
102 },
103
104 #[error("Ordering key error: {message}")]
106 OrderingKeyError {
107 message: String,
109 ordering_key: String,
111 },
112
113 #[cfg(feature = "monitoring")]
115 #[error("Monitoring error: {message}")]
116 MonitoringError {
117 message: String,
119 #[source]
121 source: Option<Box<dyn std::error::Error + Send + Sync>>,
122 },
123
124 #[error("Authentication error: {message}")]
126 AuthenticationError {
127 message: String,
129 #[source]
131 source: Option<Box<dyn std::error::Error + Send + Sync>>,
132 },
133
134 #[error("Configuration error: {message}")]
136 ConfigurationError {
137 message: String,
139 parameter: String,
141 },
142
143 #[error("Topic not found: {topic_name}")]
145 TopicNotFound {
146 topic_name: String,
148 },
149
150 #[error("Subscription not found: {subscription_name}")]
152 SubscriptionNotFound {
153 subscription_name: String,
155 },
156
157 #[error("Message too large: {size} bytes (max: {max_size} bytes)")]
159 MessageTooLarge {
160 size: usize,
162 max_size: usize,
164 },
165
166 #[error("Invalid message format: {message}")]
168 InvalidMessageFormat {
169 message: String,
171 },
172
173 #[error("Operation timed out after {duration_ms}ms")]
175 Timeout {
176 duration_ms: u64,
178 },
179
180 #[error("Network error: {message}")]
182 NetworkError {
183 message: String,
185 #[source]
187 source: Option<Box<dyn std::error::Error + Send + Sync>>,
188 },
189
190 #[error("Resource exhausted: {resource}")]
192 ResourceExhausted {
193 resource: String,
195 retry_after: Option<u64>,
197 },
198
199 #[error("Permission denied: {operation}")]
201 PermissionDenied {
202 operation: String,
204 },
205
206 #[error("Internal error: {message}")]
208 InternalError {
209 message: String,
211 },
212
213 #[error("I/O error: {0}")]
215 Io(#[from] std::io::Error),
216
217 #[error("JSON error: {0}")]
219 Json(#[from] serde_json::Error),
220
221 #[error("Pub/Sub client error: {0}")]
223 ClientError(String),
224}
225
226#[cfg(feature = "schema")]
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum SchemaFormat {
230 Avro,
232 Protobuf,
234}
235
236#[cfg(feature = "schema")]
237impl fmt::Display for SchemaFormat {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match self {
240 SchemaFormat::Avro => write!(f, "Avro"),
241 SchemaFormat::Protobuf => write!(f, "Protobuf"),
242 }
243 }
244}
245
246impl PubSubError {
247 pub fn publish<S: Into<String>>(message: S) -> Self {
249 Self::PublishError {
250 message: message.into(),
251 source: None,
252 }
253 }
254
255 pub fn publish_with_source<S: Into<String>>(
257 message: S,
258 source: Box<dyn std::error::Error + Send + Sync>,
259 ) -> Self {
260 Self::PublishError {
261 message: message.into(),
262 source: Some(source),
263 }
264 }
265
266 pub fn subscription<S: Into<String>>(message: S) -> Self {
268 Self::SubscriptionError {
269 message: message.into(),
270 source: None,
271 }
272 }
273
274 pub fn subscription_with_source<S: Into<String>>(
276 message: S,
277 source: Box<dyn std::error::Error + Send + Sync>,
278 ) -> Self {
279 Self::SubscriptionError {
280 message: message.into(),
281 source: Some(source),
282 }
283 }
284
285 pub fn acknowledgment<S: Into<String>>(message: S) -> Self {
287 Self::AcknowledgmentError {
288 message: message.into(),
289 source: None,
290 }
291 }
292
293 pub fn configuration<S: Into<String>, P: Into<String>>(message: S, parameter: P) -> Self {
295 Self::ConfigurationError {
296 message: message.into(),
297 parameter: parameter.into(),
298 }
299 }
300
301 pub fn batching<S: Into<String>>(message: S, batch_size: usize) -> Self {
303 Self::BatchingError {
304 message: message.into(),
305 batch_size,
306 }
307 }
308
309 pub fn flow_control<S: Into<String>>(
311 message: S,
312 current_count: usize,
313 max_count: usize,
314 ) -> Self {
315 Self::FlowControlError {
316 message: message.into(),
317 current_count,
318 max_count,
319 }
320 }
321
322 pub fn dead_letter<S: Into<String>, M: Into<String>>(message: S, message_id: M) -> Self {
324 Self::DeadLetterQueueError {
325 message: message.into(),
326 message_id: message_id.into(),
327 }
328 }
329
330 pub fn ordering_key<S: Into<String>, K: Into<String>>(message: S, ordering_key: K) -> Self {
332 Self::OrderingKeyError {
333 message: message.into(),
334 ordering_key: ordering_key.into(),
335 }
336 }
337
338 pub fn topic_not_found<S: Into<String>>(topic_name: S) -> Self {
340 Self::TopicNotFound {
341 topic_name: topic_name.into(),
342 }
343 }
344
345 pub fn subscription_not_found<S: Into<String>>(subscription_name: S) -> Self {
347 Self::SubscriptionNotFound {
348 subscription_name: subscription_name.into(),
349 }
350 }
351
352 pub fn message_too_large(size: usize, max_size: usize) -> Self {
354 Self::MessageTooLarge { size, max_size }
355 }
356
357 pub fn timeout(duration_ms: u64) -> Self {
359 Self::Timeout { duration_ms }
360 }
361
362 pub fn resource_exhausted<S: Into<String>>(resource: S, retry_after: Option<u64>) -> Self {
364 Self::ResourceExhausted {
365 resource: resource.into(),
366 retry_after,
367 }
368 }
369
370 pub fn permission_denied<S: Into<String>>(operation: S) -> Self {
372 Self::PermissionDenied {
373 operation: operation.into(),
374 }
375 }
376
377 pub fn is_retryable(&self) -> bool {
379 matches!(
380 self,
381 Self::NetworkError { .. }
382 | Self::ResourceExhausted { .. }
383 | Self::Timeout { .. }
384 | Self::InternalError { .. }
385 )
386 }
387
388 pub fn retry_after(&self) -> Option<u64> {
390 match self {
391 Self::ResourceExhausted { retry_after, .. } => *retry_after,
392 _ => None,
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[test]
402 fn test_publish_error_creation() {
403 let error = PubSubError::publish("test error");
404 assert!(matches!(error, PubSubError::PublishError { .. }));
405 assert!(error.to_string().contains("test error"));
406 }
407
408 #[test]
409 fn test_subscription_error_creation() {
410 let error = PubSubError::subscription("test error");
411 assert!(matches!(error, PubSubError::SubscriptionError { .. }));
412 assert!(error.to_string().contains("test error"));
413 }
414
415 #[test]
416 fn test_configuration_error() {
417 let error = PubSubError::configuration("invalid value", "timeout");
418 assert!(matches!(error, PubSubError::ConfigurationError { .. }));
419 assert!(error.to_string().contains("invalid value"));
420 }
421
422 #[test]
423 fn test_message_too_large_error() {
424 let error = PubSubError::message_too_large(11000000, 10000000);
425 assert!(matches!(error, PubSubError::MessageTooLarge { .. }));
426 assert!(error.to_string().contains("11000000"));
427 assert!(error.to_string().contains("10000000"));
428 }
429
430 #[test]
431 fn test_retryable_errors() {
432 let network_error = PubSubError::NetworkError {
433 message: "connection reset".to_string(),
434 source: None,
435 };
436 assert!(network_error.is_retryable());
437
438 let timeout_error = PubSubError::timeout(5000);
439 assert!(timeout_error.is_retryable());
440
441 let config_error = PubSubError::configuration("bad value", "param");
442 assert!(!config_error.is_retryable());
443 }
444
445 #[test]
446 fn test_retry_after() {
447 let error = PubSubError::resource_exhausted("quota", Some(60));
448 assert_eq!(error.retry_after(), Some(60));
449
450 let error = PubSubError::timeout(1000);
451 assert_eq!(error.retry_after(), None);
452 }
453
454 #[test]
455 fn test_topic_not_found() {
456 let error = PubSubError::topic_not_found("my-topic");
457 assert!(matches!(error, PubSubError::TopicNotFound { .. }));
458 assert!(error.to_string().contains("my-topic"));
459 }
460
461 #[test]
462 fn test_flow_control_error() {
463 let error = PubSubError::flow_control("limit exceeded", 1000, 500);
464 assert!(matches!(error, PubSubError::FlowControlError { .. }));
465 assert!(error.to_string().contains("limit exceeded"));
466 }
467}