rabbitmq_stream_client/
error.rs1use rabbitmq_stream_protocol::{
2 error::{DecodeError, EncodeError},
3 ResponseCode,
4};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum ClientError {
9 #[error(transparent)]
10 Io(#[from] std::io::Error),
11
12 #[error(transparent)]
13 Protocol(#[from] ProtocolError),
14 #[error("Cast Error: {0}")]
15 CastError(String),
16 #[error(transparent)]
17 GenericError(#[from] Box<dyn std::error::Error + Send + Sync>),
18 #[error("Client already closed")]
19 AlreadyClosed,
20 #[error("Connection closed")]
21 ConnectionClosed,
22 #[error(transparent)]
23 Tls(#[from] tokio_rustls::rustls::Error),
24 #[error("Request error: {0:?}")]
25 RequestError(ResponseCode),
26}
27
28#[derive(Error, Debug)]
29pub enum ConsumerStoreOffsetError {
30 #[error("Failed to store offset, missing consumer name")]
31 NameMissing,
32 #[error(transparent)]
33 Client(#[from] ClientError),
34}
35
36#[derive(Error, Debug)]
37pub enum ProtocolError {
38 #[error("Encode Error {0:?}")]
39 Encode(EncodeError),
40 #[error("Decode Error {0:?}")]
41 Decode(DecodeError),
42}
43
44impl From<EncodeError> for ClientError {
45 fn from(err: EncodeError) -> Self {
46 ClientError::Protocol(ProtocolError::Encode(err))
47 }
48}
49
50impl From<DecodeError> for ClientError {
51 fn from(err: DecodeError) -> Self {
52 ClientError::Protocol(ProtocolError::Decode(err))
53 }
54}
55
56#[derive(Error, Debug)]
57pub enum StreamCreateError {
58 #[error("Failed to create stream {stream} status: {status:?}")]
59 Create {
60 stream: String,
61 status: ResponseCode,
62 },
63 #[error(transparent)]
64 Client(#[from] ClientError),
65}
66
67#[derive(Error, Debug)]
68pub enum StreamDeleteError {
69 #[error("Failed to delete stream {stream} status: {status:?}")]
70 Delete {
71 stream: String,
72 status: ResponseCode,
73 },
74 #[error(transparent)]
75 Client(#[from] ClientError),
76}
77
78#[derive(Error, Debug)]
79pub enum ProducerCreateError {
80 #[error("Failed to create producer for stream {stream} status {status:?}")]
81 Create {
82 stream: String,
83 status: ResponseCode,
84 },
85
86 #[error("Stream {stream} does not exist")]
87 StreamDoesNotExist { stream: String },
88
89 #[error(transparent)]
90 Client(#[from] ClientError),
91
92 #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")]
93 FilteringNotSupport,
94}
95
96#[derive(Error, Debug)]
97pub enum ProducerPublishError {
98 #[error("Failed to publish message for stream {stream} status {status:?}")]
99 Create {
100 stream: String,
101 publisher_id: u8,
102 status: ResponseCode,
103 },
104 #[error("Failed to send batch messages for stream {stream}")]
105 Batch { stream: String },
106 #[error("Failed to publish message, the producer is closed")]
107 Closed,
108 #[error("Failed to publish message, confirmation channel returned None for stream {stream}")]
109 Confirmation { stream: String },
110 #[error(transparent)]
111 Client(#[from] ClientError),
112 #[error("Failed to publish message, timeout")]
113 Timeout,
114}
115
116#[derive(Error, Debug)]
117pub enum SuperStreamProducerPublishError {
118 #[error("Failed to send message to stream")]
119 ProducerPublishError(),
120 #[error("Failed to create a producer")]
121 ProducerCreateError(),
122}
123
124impl From<ProducerPublishError> for SuperStreamProducerPublishError {
125 fn from(_err: ProducerPublishError) -> Self {
126 SuperStreamProducerPublishError::ProducerPublishError()
127 }
128}
129impl From<ProducerCreateError> for SuperStreamProducerPublishError {
130 fn from(_err: ProducerCreateError) -> Self {
131 SuperStreamProducerPublishError::ProducerCreateError()
132 }
133}
134
135#[derive(Error, Debug)]
136pub enum ProducerCloseError {
137 #[error("Failed to close producer for stream {stream} status {status:?}")]
138 Close {
139 stream: String,
140 status: ResponseCode,
141 },
142 #[error("Producer already closed")]
143 AlreadyClosed,
144 #[error(transparent)]
145 Client(#[from] ClientError),
146}
147
148#[derive(Error, Debug)]
149pub enum ConsumerCreateError {
150 #[error("Failed to create consumer for stream {stream} status {status:?}")]
151 Create {
152 stream: String,
153 status: ResponseCode,
154 },
155
156 #[error("Stream {stream} does not exist")]
157 StreamDoesNotExist { stream: String },
158
159 #[error(transparent)]
160 Client(#[from] ClientError),
161
162 #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")]
163 FilteringNotSupport,
164
165 #[error("if you set single active consumer a consumer and super_stream consumer name need to be setup")]
166 SingleActiveConsumerNotSupported,
167}
168
169#[derive(Error, Debug)]
170pub enum ConsumerDeliveryError {
171 #[error("Failed to create consumer for stream {stream} status {status:?}")]
172 Credit {
173 stream: String,
174 status: ResponseCode,
175 },
176 #[error(transparent)]
177 Client(#[from] ClientError),
178}
179#[derive(Error, Debug)]
180pub enum ConsumerCloseError {
181 #[error("Failed to close consumer for stream {stream} status {status:?}")]
182 Close {
183 stream: String,
184 status: ResponseCode,
185 },
186 #[error("Consumer already closed")]
187 AlreadyClosed,
188 #[error(transparent)]
189 Client(#[from] ClientError),
190}