kafkit_client/core/
error.rs1use thiserror::Error;
19use tokio::task::JoinError;
20
21pub type Result<T> = std::result::Result<T, Error>;
23
24#[derive(Debug, Error)]
25pub enum Error {
27 #[error(transparent)]
29 Admin(#[from] AdminError),
30 #[error(transparent)]
32 Consumer(#[from] ConsumerError),
33 #[error(transparent)]
35 Producer(#[from] ProducerError),
36 #[error(transparent)]
38 ConsumerGroupMetadata(#[from] ConsumerGroupMetadataError),
39 #[error(transparent)]
41 TransactionState(#[from] TransactionStateError),
42 #[error(transparent)]
44 Internal(#[from] anyhow::Error),
45}
46
47#[derive(Debug, Error)]
48pub enum AdminError {
50 #[error("topic names must be non-empty")]
52 EmptyTopicName,
53 #[error("topic partition count must be positive: {partitions}")]
55 InvalidPartitionCount {
56 partitions: i32,
58 },
59 #[error("topic replication factor must be positive: {replication_factor}")]
61 InvalidReplicationFactor {
62 replication_factor: i16,
64 },
65}
66
67#[derive(Debug, Error)]
68pub enum ConsumerError {
70 #[error("consumer runtime stopped before {operation}")]
72 ThreadStoppedBefore {
73 operation: &'static str,
75 },
76 #[error("consumer runtime stopped during {operation}")]
78 ThreadStoppedDuring {
79 operation: &'static str,
81 },
82 #[error("failed to join consumer runtime: {0}")]
84 Join(#[source] JoinError),
85 #[error("subscribe requires at least one non-empty topic name")]
87 EmptySubscription,
88 #[error("subscribe_pattern requires a non-empty pattern")]
90 EmptySubscriptionPattern,
91 #[error("subscribe_regex requires a valid regular expression: {message}")]
93 InvalidSubscriptionRegex {
94 message: String,
96 },
97 #[error("concurrent poll calls are not supported by this simple consumer")]
99 ConcurrentPoll,
100 #[error("poll was interrupted by wakeup()")]
102 Wakeup,
103 #[error("seek offset must be non-negative: {offset}")]
105 InvalidSeekOffset {
106 offset: i64,
108 },
109 #[error("topic partition names must be non-empty")]
111 EmptyTopicPartition,
112 #[error("{operation} requires an assigned partition, but {topic}:{partition} is not assigned")]
114 PartitionNotAssigned {
115 operation: &'static str,
117 topic: String,
119 partition: i32,
121 },
122 #[error("broker rejected the subscription regex: {message}")]
124 InvalidRegularExpression {
125 message: String,
127 },
128 #[error("broker rejected the configured server assignor '{assignor}': {message}")]
130 UnsupportedAssignor {
131 assignor: String,
133 message: String,
135 },
136 #[error("static member '{instance_id}' is still owned by another consumer: {message}")]
138 UnreleasedInstanceId {
139 instance_id: String,
141 message: String,
143 },
144 #[error("static member '{instance_id}' was fenced: {message}")]
146 FencedInstanceId {
147 instance_id: String,
149 message: String,
151 },
152}
153
154#[derive(Debug, Error)]
155pub enum ProducerError {
157 #[error("idempotent producers require acks=-1")]
159 IdempotenceRequiresAcksAll,
160 #[error("idempotent producers require max_retries > 0")]
162 IdempotenceRequiresRetries,
163 #[error(
165 "transactional producers require acks=-1 so the broker can commit the full transaction"
166 )]
167 TransactionalRequiresAcksAll,
168 #[error("broker did not advertise finalized feature level for transaction.version")]
170 MissingTransactionVersionFeature,
171 #[error(
173 "broker finalized transaction.version={level}, but transaction v2 requires transaction.version>=2"
174 )]
175 UnsupportedTransactionVersion {
176 level: i16,
178 },
179 #[error("{api} v{min_version}+ is required, broker only supports v{broker_version}")]
181 UnsupportedApiVersion {
182 api: &'static str,
184 min_version: i16,
186 broker_version: i16,
188 },
189 #[error("producer is not configured with a transactional_id")]
191 NotTransactional,
192 #[error("producer sender thread stopped before {operation}")]
194 ThreadStoppedBefore {
195 operation: &'static str,
197 },
198 #[error("producer sender thread stopped during {operation}")]
200 ThreadStoppedDuring {
201 operation: &'static str,
203 },
204 #[error("failed to join producer sender thread: {0}")]
206 Join(#[source] JoinError),
207}
208
209#[derive(Debug, Error)]
210pub enum ConsumerGroupMetadataError {
212 #[error("consumer group metadata requires a non-empty group_id")]
214 EmptyGroupId,
215 #[error("consumer group metadata has generation_id > 0 but no member_id")]
217 MissingMemberId,
218}
219
220#[derive(Debug, Error)]
221pub enum TransactionStateError {
223 #[error("transaction already in progress")]
225 AlreadyInProgress,
226 #[error("transaction is already completing with {0}")]
228 Completing(&'static str),
229 #[error("transaction must be aborted before reuse: {0}")]
231 MustAbortBeforeReuse(String),
232 #[error("transaction is unusable: {0}")]
234 Fatal(String),
235 #[error("transactional send requires begin_transaction() before send()")]
237 AppendWithoutBegin,
238 #[error("send_offsets_to_transaction requires begin_transaction() first")]
240 SendOffsetsWithoutBegin,
241 #[error("transaction has failed and must be aborted: {0}")]
243 AbortRequired(String),
244 #[error("transaction cannot be committed and must be aborted: {0}")]
246 CommitRequiresAbort(String),
247 #[error("no active transaction to complete")]
249 NoActiveTransaction,
250 #[error("shutdown stopped with an active transaction still in progress")]
252 ShutdownWithActiveTransaction,
253 #[error("shutdown stopped while transaction was still completing with {0}")]
255 ShutdownWhileCompleting(&'static str),
256 #[error("shutdown stopped with an un-aborted failed transaction: {0}")]
258 ShutdownAbortRequired(String),
259 #[error("transaction failed before shutdown: {0}")]
261 ShutdownFatal(String),
262}