kafkit_client/core/
error.rs1use thiserror::Error;
19use tokio::task::JoinError;
20
21pub type Result<T> = std::result::Result<T, Error>;
23
24#[derive(Debug, Error)]
25pub enum Error {
27 #[error("operation cancelled")]
29 Cancelled,
30 #[error(transparent)]
32 Admin(#[from] AdminError),
33 #[error(transparent)]
35 Consumer(#[from] ConsumerError),
36 #[error(transparent)]
38 Producer(#[from] ProducerError),
39 #[error(transparent)]
41 ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
42 #[error(transparent)]
44 TransactionState(#[from] TransactionStateError),
45 #[error(transparent)]
47 Internal(#[from] anyhow::Error),
48}
49
50#[derive(Debug, Error)]
51pub enum AdminError {
53 #[error("topic names must be non-empty")]
55 EmptyTopicName,
56 #[error("topic partition count must be positive: {partitions}")]
58 InvalidPartitionCount {
59 partitions: i32,
61 },
62 #[error("topic replication factor must be positive: {replication_factor}")]
64 InvalidReplicationFactor {
65 replication_factor: i16,
67 },
68}
69
70#[derive(Debug, Error)]
71pub enum ConsumerError {
73 #[error("consumer runtime stopped before {operation}")]
75 ThreadStoppedBefore {
76 operation: &'static str,
78 },
79 #[error("consumer runtime stopped during {operation}")]
81 ThreadStoppedDuring {
82 operation: &'static str,
84 },
85 #[error("failed to join consumer runtime: {0}")]
87 Join(#[source] JoinError),
88 #[error("subscribe requires at least one non-empty topic name")]
90 EmptySubscription,
91 #[error("subscribe_pattern requires a non-empty pattern")]
93 EmptySubscriptionPattern,
94 #[error("subscribe_regex requires a valid regular expression: {message}")]
96 InvalidSubscriptionRegex {
97 message: String,
99 },
100 #[error("concurrent poll calls are not supported by this simple consumer")]
102 ConcurrentPoll,
103 #[error("poll was interrupted by wakeup()")]
105 Wakeup,
106 #[error("seek offset must be non-negative: {offset}")]
108 InvalidSeekOffset {
109 offset: i64,
111 },
112 #[error("topic partition names must be non-empty")]
114 EmptyTopicPartition,
115 #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
117 PartitionNotAssigned {
118 operation: &'static str,
120 topic: String,
122 partition: i32,
124 },
125 #[error("broker rejected the subscription regex: {message}")]
127 InvalidRegularExpression {
128 message: String,
130 },
131 #[error("broker rejected the configured server assignor '{assignor}': {message}")]
133 UnsupportedAssignor {
134 assignor: String,
136 message: String,
138 },
139 #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
141 UnreleasedInstanceId {
142 instance_id: String,
144 message: String,
146 },
147 #[error("static member '{instance_id}' was fenced: {message}")]
149 FencedInstanceId {
150 instance_id: String,
152 message: String,
154 },
155}
156
157#[derive(Debug, Error)]
158pub enum ProducerError {
160 #[error("idempotent producers require acks=-1")]
162 IdempotenceRequiresAcksAll,
163 #[error("idempotent producers require max_retries > 0")]
165 IdempotenceRequiresRetries,
166 #[error(
168 "transactional producers require acks=-1 so the broker can commit the full transaction"
169 )]
170 TransactionalRequiresAcksAll,
171 #[error("broker did not advertise finalized feature level for transaction.version")]
173 MissingTransactionVersionFeature,
174 #[error(
176 "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
177 )]
178 UnsupportedTransactionVersion {
179 level: i16,
181 },
182 #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
184 UnsupportedApiVersion {
185 api: &'static str,
187 min_version: i16,
189 broker_version: i16,
191 },
192 #[error("producer is not configured with a transactional_id")]
194 NotTransactional,
195 #[error("producer sender thread stopped before {operation}")]
197 ThreadStoppedBefore {
198 operation: &'static str,
200 },
201 #[error("producer sender thread stopped during {operation}")]
203 ThreadStoppedDuring {
204 operation: &'static str,
206 },
207 #[error("failed to join producer sender thread: {0}")]
209 Join(#[source] JoinError),
210}
211
212#[derive(Debug, Error)]
213pub enum ConsumerGroupMetadataError {
215 #[error("consumer group metadata requires a non-empty group_id")]
217 EmptyGroupId,
218 #[error("consumer group metadata has generation_id > 0 but no member_id")]
220 MissingMemberId,
221}
222
223#[derive(Debug, Error)]
224pub enum TransactionStateError {
226 #[error("transaction already in progress")]
228 AlreadyInProgress,
229 #[error("transaction is already completing with {0}")]
231 Completing(&'static str),
232 #[error("transaction must be aborted before reuse: {0}")]
234 MustAbortBeforeReuse(String),
235 #[error("transaction is unusable: {0}")]
237 Fatal(String),
238 #[error("transactional send requires begin_transaction() before send()")]
240 AppendWithoutBegin,
241 #[error("send_offsets_to_transaction requires begin_transaction() first")]
243 SendOffsetsWithoutBegin,
244 #[error("transaction has failed and must be aborted: {0}")]
246 AbortRequired(String),
247 #[error("transaction cannot be committed and must be aborted: {0}")]
249 CommitRequiresAbort(String),
250 #[error("no active transaction to complete")]
252 NoActiveTransaction,
253 #[error("shutdown stopped with an active transaction still in progress")]
255 ShutdownWithActiveTransaction,
256 #[error("shutdown stopped while transaction was still completing with {0}")]
258 ShutdownWhileCompleting(&'static str),
259 #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
261 ShutdownAbortRequired(String),
262 #[error("transaction failed before shutdown: {0}")]
264 ShutdownFatal(String),
265}