Skip to main content

kafkit_client/core/
error.rs

1//! Error types returned by Kafkit.
2//!
3//! Most applications can use [`Result`] directly and match on [`crate::Error`] only
4//! when they need to handle a specific category.
5//!
6//! ```no_run
7//! # async fn example() -> kafkit_client::Result<()> {
8//! use kafkit_client::{Error, KafkaClient};
9//!
10//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
11//! if let Err(Error::Admin(error)) = admin.create_topics(Vec::<kafkit_client::NewTopic>::new()).await {
12//!     eprintln!("admin request failed: {error}");
13//! }
14//! # Ok(())
15//! # }
16//! ```
17//!
18use thiserror::Error;
19use tokio::task::JoinError;
20
21/// Result type used by the client APIs.
22pub type Result<T> = std::result::Result<T, Error>;
23
24#[derive(Debug, Error)]
25/// Top-level error returned by the crate.
26pub enum Error {
27    /// Operation was cancelled before it completed.
28    #[error("operation cancelled")]
29    Cancelled,
30    /// Admin operation failed.
31    #[error(transparent)]
32    Admin(#[from] AdminError),
33    /// Consumer operation failed.
34    #[error(transparent)]
35    Consumer(#[from] ConsumerError),
36    /// Producer operation failed.
37    #[error(transparent)]
38    Producer(#[from] ProducerError),
39    /// Consumer group metadata was invalid.
40    #[error(transparent)]
41    ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
42    /// Transaction state prevented the operation.
43    #[error(transparent)]
44    TransactionState(#[from] TransactionStateError),
45    /// Internal error from protocol handling, IO, or validation.
46    #[error(transparent)]
47    Internal(#[from] anyhow::Error),
48}
49
50#[derive(Debug, Error)]
51/// Errors raised before or during admin operations.
52pub enum AdminError {
53    /// A topic name was empty.
54    #[error("topic names must be non-empty")]
55    EmptyTopicName,
56    /// A topic was requested with an invalid partition count.
57    #[error("topic partition count must be positive: {partitions}")]
58    InvalidPartitionCount {
59        /// Requested partition count.
60        partitions: i32,
61    },
62    /// A topic was requested with an invalid replication factor.
63    #[error("topic replication factor must be positive: {replication_factor}")]
64    InvalidReplicationFactor {
65        /// Requested replication factor.
66        replication_factor: i16,
67    },
68}
69
70#[derive(Debug, Error)]
71/// Errors raised by the consumer API.
72pub enum ConsumerError {
73    /// The runtime stopped before the operation was sent.
74    #[error("consumer runtime stopped before {operation}")]
75    ThreadStoppedBefore {
76        /// Operation that was waiting to be sent.
77        operation: &'static str,
78    },
79    /// The runtime stopped while the operation was waiting for a reply.
80    #[error("consumer runtime stopped during {operation}")]
81    ThreadStoppedDuring {
82        /// Operation that was in flight.
83        operation: &'static str,
84    },
85    /// The background consumer task could not be joined.
86    #[error("failed to join consumer runtime: {0}")]
87    Join(#[source] JoinError),
88    /// A subscription was requested without any topic names.
89    #[error("subscribe requires at least one non-empty topic name")]
90    EmptySubscription,
91    /// A regex subscription was requested with an empty pattern.
92    #[error("subscribe_pattern requires a non-empty pattern")]
93    EmptySubscriptionPattern,
94    /// A regex subscription could not be compiled locally.
95    #[error("subscribe_regex requires a valid regular expression: {message}")]
96    InvalidSubscriptionRegex {
97        /// Regex parser error message.
98        message: String,
99    },
100    /// Another poll call was already active.
101    #[error("concurrent poll calls are not supported by this simple consumer")]
102    ConcurrentPoll,
103    /// A blocking poll was interrupted with `wakeup`.
104    #[error("poll was interrupted by wakeup()")]
105    Wakeup,
106    /// A seek offset was negative.
107    #[error("seek offset must be non-negative: {offset}")]
108    InvalidSeekOffset {
109        /// Requested offset.
110        offset: i64,
111    },
112    /// A topic partition had an empty topic name.
113    #[error("topic partition names must be non-empty")]
114    EmptyTopicPartition,
115    /// The operation needs a partition currently assigned to this consumer.
116    #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
117    PartitionNotAssigned {
118        /// Operation that needed the assignment.
119        operation: &'static str,
120        /// Topic name.
121        topic: String,
122        /// Partition number.
123        partition: i32,
124    },
125    /// The broker rejected the subscription regex.
126    #[error("broker rejected the subscription regex: {message}")]
127    InvalidRegularExpression {
128        /// Broker error message.
129        message: String,
130    },
131    /// The broker rejected the configured server-side assignor.
132    #[error("broker rejected the configured server assignor '{assignor}': {message}")]
133    UnsupportedAssignor {
134        /// Requested assignor name.
135        assignor: String,
136        /// Broker error message.
137        message: String,
138    },
139    /// A static member id is still owned by another consumer instance.
140    #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
141    UnreleasedInstanceId {
142        /// Static member instance id.
143        instance_id: String,
144        /// Broker error message.
145        message: String,
146    },
147    /// The broker fenced this static member instance.
148    #[error("static member '{instance_id}' was fenced: {message}")]
149    FencedInstanceId {
150        /// Static member instance id.
151        instance_id: String,
152        /// Broker error message.
153        message: String,
154    },
155}
156
157#[derive(Debug, Error)]
158/// Errors raised by the producer API.
159pub enum ProducerError {
160    /// Idempotence was enabled without `acks=-1`.
161    #[error("idempotent producers require acks=-1")]
162    IdempotenceRequiresAcksAll,
163    /// Idempotence was enabled without retries.
164    #[error("idempotent producers require max_retries > 0")]
165    IdempotenceRequiresRetries,
166    /// Transactions require all replicas to acknowledge records.
167    #[error(
168        "transactional producers require acks=-1 so the broker can commit the full transaction"
169    )]
170    TransactionalRequiresAcksAll,
171    /// The broker did not report the transaction feature level.
172    #[error("broker did not advertise finalized feature level for transaction.version")]
173    MissingTransactionVersionFeature,
174    /// The broker transaction feature level is too old.
175    #[error(
176        "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
177    )]
178    UnsupportedTransactionVersion {
179        /// Broker feature level.
180        level: i16,
181    },
182    /// A broker API version is older than this client needs.
183    #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
184    UnsupportedApiVersion {
185        /// API name.
186        api: &'static str,
187        /// Minimum required API version.
188        min_version: i16,
189        /// Version advertised by the broker.
190        broker_version: i16,
191    },
192    /// The operation requires a configured transactional id.
193    #[error("producer is not configured with a transactional_id")]
194    NotTransactional,
195    /// The sender task stopped before the operation was sent.
196    #[error("producer sender thread stopped before {operation}")]
197    ThreadStoppedBefore {
198        /// Operation that was waiting to be sent.
199        operation: &'static str,
200    },
201    /// The sender task stopped while the operation was waiting for a reply.
202    #[error("producer sender thread stopped during {operation}")]
203    ThreadStoppedDuring {
204        /// Operation that was in flight.
205        operation: &'static str,
206    },
207    /// The sender task could not be joined.
208    #[error("failed to join producer sender thread: {0}")]
209    Join(#[source] JoinError),
210}
211
212#[derive(Debug, Error)]
213/// Errors found while validating consumer group metadata.
214pub enum ConsumerGroupMetadataError {
215    /// The group id was empty.
216    #[error("consumer group metadata requires a non-empty group_id")]
217    EmptyGroupId,
218    /// Active group metadata needs a member id.
219    #[error("consumer group metadata has generation_id > 0 but no member_id")]
220    MissingMemberId,
221}
222
223#[derive(Debug, Error)]
224/// Errors raised by the producer transaction state machine.
225pub enum TransactionStateError {
226    /// A transaction is already open.
227    #[error("transaction already in progress")]
228    AlreadyInProgress,
229    /// The transaction is already completing.
230    #[error("transaction is already completing with {0}")]
231    Completing(&'static str),
232    /// The transaction must be aborted before it can be used again.
233    #[error("transaction must be aborted before reuse: {0}")]
234    MustAbortBeforeReuse(String),
235    /// The transaction is permanently failed.
236    #[error("transaction is unusable: {0}")]
237    Fatal(String),
238    /// Records were sent before a transaction was started.
239    #[error("transactional send requires begin_transaction() before send()")]
240    AppendWithoutBegin,
241    /// Offsets were sent before a transaction was started.
242    #[error("send_offsets_to_transaction requires begin_transaction() first")]
243    SendOffsetsWithoutBegin,
244    /// The transaction needs an abort before more work can happen.
245    #[error("transaction has failed and must be aborted: {0}")]
246    AbortRequired(String),
247    /// The transaction cannot be committed and must be aborted.
248    #[error("transaction cannot be committed and must be aborted: {0}")]
249    CommitRequiresAbort(String),
250    /// There is no active transaction to finish.
251    #[error("no active transaction to complete")]
252    NoActiveTransaction,
253    /// Shutdown found an active transaction.
254    #[error("shutdown stopped with an active transaction still in progress")]
255    ShutdownWithActiveTransaction,
256    /// Shutdown found a transaction completion in progress.
257    #[error("shutdown stopped while transaction was still completing with {0}")]
258    ShutdownWhileCompleting(&'static str),
259    /// Shutdown found a failed transaction that still needs aborting.
260    #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
261    ShutdownAbortRequired(String),
262    /// Shutdown found a fatal transaction error.
263    #[error("transaction failed before shutdown: {0}")]
264    ShutdownFatal(String),
265}