rabbitmq_stream_client/
error.rs

1use rabbitmq_stream_protocol::{
2    error::{DecodeError, EncodeError},
3    ResponseCode,
4};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum ClientError {
9    #[error(transparent)]
10    Io(#[from] std::io::Error),
11
12    #[error(transparent)]
13    Protocol(#[from] ProtocolError),
14    #[error("Cast Error: {0}")]
15    CastError(String),
16    #[error(transparent)]
17    GenericError(#[from] Box<dyn std::error::Error + Send + Sync>),
18    #[error("Client already closed")]
19    AlreadyClosed,
20    #[error("Connection closed")]
21    ConnectionClosed,
22    #[error(transparent)]
23    Tls(#[from] tokio_rustls::rustls::Error),
24    #[error("Request error: {0:?}")]
25    RequestError(ResponseCode),
26}
27
28#[derive(Error, Debug)]
29pub enum ConsumerStoreOffsetError {
30    #[error("Failed to store offset, missing consumer name")]
31    NameMissing,
32    #[error(transparent)]
33    Client(#[from] ClientError),
34}
35
36#[derive(Error, Debug)]
37pub enum ProtocolError {
38    #[error("Encode Error {0:?}")]
39    Encode(EncodeError),
40    #[error("Decode Error {0:?}")]
41    Decode(DecodeError),
42}
43
44impl From<EncodeError> for ClientError {
45    fn from(err: EncodeError) -> Self {
46        ClientError::Protocol(ProtocolError::Encode(err))
47    }
48}
49
50impl From<DecodeError> for ClientError {
51    fn from(err: DecodeError) -> Self {
52        ClientError::Protocol(ProtocolError::Decode(err))
53    }
54}
55
56#[derive(Error, Debug)]
57pub enum StreamCreateError {
58    #[error("Failed to create stream {stream} status: {status:?}")]
59    Create {
60        stream: String,
61        status: ResponseCode,
62    },
63    #[error(transparent)]
64    Client(#[from] ClientError),
65}
66
67#[derive(Error, Debug)]
68pub enum StreamDeleteError {
69    #[error("Failed to delete stream {stream} status: {status:?}")]
70    Delete {
71        stream: String,
72        status: ResponseCode,
73    },
74    #[error(transparent)]
75    Client(#[from] ClientError),
76}
77
78#[derive(Error, Debug)]
79pub enum ProducerCreateError {
80    #[error("Failed to create producer for stream {stream} status {status:?}")]
81    Create {
82        stream: String,
83        status: ResponseCode,
84    },
85
86    #[error("Stream {stream} does not exist")]
87    StreamDoesNotExist { stream: String },
88
89    #[error(transparent)]
90    Client(#[from] ClientError),
91
92    #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")]
93    FilteringNotSupport,
94}
95
96#[derive(Error, Debug)]
97pub enum ProducerPublishError {
98    #[error("Failed to publish message for stream {stream} status {status:?}")]
99    Create {
100        stream: String,
101        publisher_id: u8,
102        status: ResponseCode,
103    },
104    #[error("Failed to send batch messages for stream {stream}")]
105    Batch { stream: String },
106    #[error("Failed to publish message, the producer is closed")]
107    Closed,
108    #[error("Failed to publish message, confirmation channel returned None for stream {stream}")]
109    Confirmation { stream: String },
110    #[error(transparent)]
111    Client(#[from] ClientError),
112    #[error("Failed to publish message, timeout")]
113    Timeout,
114}
115
116#[derive(Error, Debug)]
117pub enum SuperStreamProducerPublishError {
118    #[error("Failed to send message to stream")]
119    ProducerPublishError(),
120    #[error("Failed to create a producer")]
121    ProducerCreateError(),
122}
123
124impl From<ProducerPublishError> for SuperStreamProducerPublishError {
125    fn from(_err: ProducerPublishError) -> Self {
126        SuperStreamProducerPublishError::ProducerPublishError()
127    }
128}
129impl From<ProducerCreateError> for SuperStreamProducerPublishError {
130    fn from(_err: ProducerCreateError) -> Self {
131        SuperStreamProducerPublishError::ProducerCreateError()
132    }
133}
134
135#[derive(Error, Debug)]
136pub enum ProducerCloseError {
137    #[error("Failed to close producer for stream {stream} status {status:?}")]
138    Close {
139        stream: String,
140        status: ResponseCode,
141    },
142    #[error("Producer already closed")]
143    AlreadyClosed,
144    #[error(transparent)]
145    Client(#[from] ClientError),
146}
147
148#[derive(Error, Debug)]
149pub enum ConsumerCreateError {
150    #[error("Failed to create consumer for stream {stream} status {status:?}")]
151    Create {
152        stream: String,
153        status: ResponseCode,
154    },
155
156    #[error("Stream {stream} does not exist")]
157    StreamDoesNotExist { stream: String },
158
159    #[error(transparent)]
160    Client(#[from] ClientError),
161
162    #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")]
163    FilteringNotSupport,
164
165    #[error("if you set single active consumer a consumer and super_stream consumer name need to be setup")]
166    SingleActiveConsumerNotSupported,
167}
168
169#[derive(Error, Debug)]
170pub enum ConsumerDeliveryError {
171    #[error("Failed to create consumer for stream {stream} status {status:?}")]
172    Credit {
173        stream: String,
174        status: ResponseCode,
175    },
176    #[error(transparent)]
177    Client(#[from] ClientError),
178}
179#[derive(Error, Debug)]
180pub enum ConsumerCloseError {
181    #[error("Failed to close consumer for stream {stream} status {status:?}")]
182    Close {
183        stream: String,
184        status: ResponseCode,
185    },
186    #[error("Consumer already closed")]
187    AlreadyClosed,
188    #[error(transparent)]
189    Client(#[from] ClientError),
190}