Skip to main content

kafkit_client/core/
error.rs

1//! Error types returned by Kafkit.
2//!
3//! Most applications can use [`Result`] directly and match on [`crate::Error`] only
4//! when they need to handle a specific category.
5//!
6//! ```no_run
7//! # async fn example() -> kafkit_client::Result<()> {
8//! use kafkit_client::{Error, KafkaClient};
9//!
10//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
11//! if let Err(Error::Admin(error)) = admin.create_topics(Vec::<kafkit_client::NewTopic>::new()).await {
12//!     eprintln!("admin request failed: {error}");
13//! }
14//! # Ok(())
15//! # }
16//! ```
17//!
18use thiserror::Error;
19use tokio::task::JoinError;
20
21/// Result type used by the client APIs.
22pub type Result<T> = std::result::Result<T, Error>;
23
24#[derive(Debug, Error)]
25/// Top-level error returned by the crate.
26pub enum Error {
27    /// Admin operation failed.
28    #[error(transparent)]
29    Admin(#[from] AdminError),
30    /// Consumer operation failed.
31    #[error(transparent)]
32    Consumer(#[from] ConsumerError),
33    /// Producer operation failed.
34    #[error(transparent)]
35    Producer(#[from] ProducerError),
36    /// Consumer group metadata was invalid.
37    #[error(transparent)]
38    ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
39    /// Transaction state prevented the operation.
40    #[error(transparent)]
41    TransactionState(#[from] TransactionStateError),
42    /// Internal error from protocol handling, IO, or validation.
43    #[error(transparent)]
44    Internal(#[from] anyhow::Error),
45}
46
47#[derive(Debug, Error)]
48/// Errors raised before or during admin operations.
49pub enum AdminError {
50    /// A topic name was empty.
51    #[error("topic names must be non-empty")]
52    EmptyTopicName,
53    /// A topic was requested with an invalid partition count.
54    #[error("topic partition count must be positive: {partitions}")]
55    InvalidPartitionCount {
56        /// Requested partition count.
57        partitions: i32,
58    },
59    /// A topic was requested with an invalid replication factor.
60    #[error("topic replication factor must be positive: {replication_factor}")]
61    InvalidReplicationFactor {
62        /// Requested replication factor.
63        replication_factor: i16,
64    },
65}
66
67#[derive(Debug, Error)]
68/// Errors raised by the consumer API.
69pub enum ConsumerError {
70    /// The runtime stopped before the operation was sent.
71    #[error("consumer runtime stopped before {operation}")]
72    ThreadStoppedBefore {
73        /// Operation that was waiting to be sent.
74        operation: &'static str,
75    },
76    /// The runtime stopped while the operation was waiting for a reply.
77    #[error("consumer runtime stopped during {operation}")]
78    ThreadStoppedDuring {
79        /// Operation that was in flight.
80        operation: &'static str,
81    },
82    /// The background consumer task could not be joined.
83    #[error("failed to join consumer runtime: {0}")]
84    Join(#[source] JoinError),
85    /// A subscription was requested without any topic names.
86    #[error("subscribe requires at least one non-empty topic name")]
87    EmptySubscription,
88    /// A regex subscription was requested with an empty pattern.
89    #[error("subscribe_pattern requires a non-empty pattern")]
90    EmptySubscriptionPattern,
91    /// A regex subscription could not be compiled locally.
92    #[error("subscribe_regex requires a valid regular expression: {message}")]
93    InvalidSubscriptionRegex {
94        /// Regex parser error message.
95        message: String,
96    },
97    /// Another poll call was already active.
98    #[error("concurrent poll calls are not supported by this simple consumer")]
99    ConcurrentPoll,
100    /// A blocking poll was interrupted with `wakeup`.
101    #[error("poll was interrupted by wakeup()")]
102    Wakeup,
103    /// A seek offset was negative.
104    #[error("seek offset must be non-negative: {offset}")]
105    InvalidSeekOffset {
106        /// Requested offset.
107        offset: i64,
108    },
109    /// A topic partition had an empty topic name.
110    #[error("topic partition names must be non-empty")]
111    EmptyTopicPartition,
112    /// The operation needs a partition currently assigned to this consumer.
113    #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
114    PartitionNotAssigned {
115        /// Operation that needed the assignment.
116        operation: &'static str,
117        /// Topic name.
118        topic: String,
119        /// Partition number.
120        partition: i32,
121    },
122    /// The broker rejected the subscription regex.
123    #[error("broker rejected the subscription regex: {message}")]
124    InvalidRegularExpression {
125        /// Broker error message.
126        message: String,
127    },
128    /// The broker rejected the configured server-side assignor.
129    #[error("broker rejected the configured server assignor '{assignor}': {message}")]
130    UnsupportedAssignor {
131        /// Requested assignor name.
132        assignor: String,
133        /// Broker error message.
134        message: String,
135    },
136    /// A static member id is still owned by another consumer instance.
137    #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
138    UnreleasedInstanceId {
139        /// Static member instance id.
140        instance_id: String,
141        /// Broker error message.
142        message: String,
143    },
144    /// The broker fenced this static member instance.
145    #[error("static member '{instance_id}' was fenced: {message}")]
146    FencedInstanceId {
147        /// Static member instance id.
148        instance_id: String,
149        /// Broker error message.
150        message: String,
151    },
152}
153
154#[derive(Debug, Error)]
155/// Errors raised by the producer API.
156pub enum ProducerError {
157    /// Idempotence was enabled without `acks=-1`.
158    #[error("idempotent producers require acks=-1")]
159    IdempotenceRequiresAcksAll,
160    /// Idempotence was enabled without retries.
161    #[error("idempotent producers require max_retries > 0")]
162    IdempotenceRequiresRetries,
163    /// Transactions require all replicas to acknowledge records.
164    #[error(
165        "transactional producers require acks=-1 so the broker can commit the full transaction"
166    )]
167    TransactionalRequiresAcksAll,
168    /// The broker did not report the transaction feature level.
169    #[error("broker did not advertise finalized feature level for transaction.version")]
170    MissingTransactionVersionFeature,
171    /// The broker transaction feature level is too old.
172    #[error(
173        "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
174    )]
175    UnsupportedTransactionVersion {
176        /// Broker feature level.
177        level: i16,
178    },
179    /// A broker API version is older than this client needs.
180    #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
181    UnsupportedApiVersion {
182        /// API name.
183        api: &'static str,
184        /// Minimum required API version.
185        min_version: i16,
186        /// Version advertised by the broker.
187        broker_version: i16,
188    },
189    /// The operation requires a configured transactional id.
190    #[error("producer is not configured with a transactional_id")]
191    NotTransactional,
192    /// The sender task stopped before the operation was sent.
193    #[error("producer sender thread stopped before {operation}")]
194    ThreadStoppedBefore {
195        /// Operation that was waiting to be sent.
196        operation: &'static str,
197    },
198    /// The sender task stopped while the operation was waiting for a reply.
199    #[error("producer sender thread stopped during {operation}")]
200    ThreadStoppedDuring {
201        /// Operation that was in flight.
202        operation: &'static str,
203    },
204    /// The sender task could not be joined.
205    #[error("failed to join producer sender thread: {0}")]
206    Join(#[source] JoinError),
207}
208
209#[derive(Debug, Error)]
210/// Errors found while validating consumer group metadata.
211pub enum ConsumerGroupMetadataError {
212    /// The group id was empty.
213    #[error("consumer group metadata requires a non-empty group_id")]
214    EmptyGroupId,
215    /// Active group metadata needs a member id.
216    #[error("consumer group metadata has generation_id > 0 but no member_id")]
217    MissingMemberId,
218}
219
220#[derive(Debug, Error)]
221/// Errors raised by the producer transaction state machine.
222pub enum TransactionStateError {
223    /// A transaction is already open.
224    #[error("transaction already in progress")]
225    AlreadyInProgress,
226    /// The transaction is already completing.
227    #[error("transaction is already completing with {0}")]
228    Completing(&'static str),
229    /// The transaction must be aborted before it can be used again.
230    #[error("transaction must be aborted before reuse: {0}")]
231    MustAbortBeforeReuse(String),
232    /// The transaction is permanently failed.
233    #[error("transaction is unusable: {0}")]
234    Fatal(String),
235    /// Records were sent before a transaction was started.
236    #[error("transactional send requires begin_transaction() before send()")]
237    AppendWithoutBegin,
238    /// Offsets were sent before a transaction was started.
239    #[error("send_offsets_to_transaction requires begin_transaction() first")]
240    SendOffsetsWithoutBegin,
241    /// The transaction needs an abort before more work can happen.
242    #[error("transaction has failed and must be aborted: {0}")]
243    AbortRequired(String),
244    /// The transaction cannot be committed and must be aborted.
245    #[error("transaction cannot be committed and must be aborted: {0}")]
246    CommitRequiresAbort(String),
247    /// There is no active transaction to finish.
248    #[error("no active transaction to complete")]
249    NoActiveTransaction,
250    /// Shutdown found an active transaction.
251    #[error("shutdown stopped with an active transaction still in progress")]
252    ShutdownWithActiveTransaction,
253    /// Shutdown found a transaction completion in progress.
254    #[error("shutdown stopped while transaction was still completing with {0}")]
255    ShutdownWhileCompleting(&'static str),
256    /// Shutdown found a failed transaction that still needs aborting.
257    #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
258    ShutdownAbortRequired(String),
259    /// Shutdown found a fatal transaction error.
260    #[error("transaction failed before shutdown: {0}")]
261    ShutdownFatal(String),
262}