1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Internal errors produced by the broker's handlers and lifecycle.
//!
//! These are NOT Kafka wire-level error codes (those live in
//! [`crate::codes`]). Conversion from `BrokerError` to a wire code
//! happens at the handler boundary.
use thiserror::Error;
/// Errors produced by the broker's lifecycle and handlers.
///
/// Returned from [`crate::Broker::start`] and propagated up from
/// per-connection serve loops. The `#[non_exhaustive]` attribute lets
/// future variants be added without a breaking change.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum BrokerError {
/// Filesystem I/O failure (binding the listener, opening log dirs).
#[error("I/O: {0}")]
Io(#[from] std::io::Error),
/// Storage-layer error bubbling up from [`crabka_log`].
#[error("log: {0}")]
Log(#[from] crabka_log::LogError),
/// Wire-protocol decoding or encoding error.
#[error("protocol: {0}")]
Protocol(#[from] crabka_protocol::ProtocolError),
/// The peer sent a `(api_key, version)` the handler table doesn't
/// know how to serve.
#[error("unsupported api_key={api_key} version={version}")]
UnsupportedApi {
/// The unsupported Kafka API key.
api_key: i16,
/// The unsupported version negotiated by the peer.
version: i16,
},
/// A produce request landed on a partition whose writer actor has
/// exited — typically only seen at shutdown.
#[error("partition writer for {topic}-{partition} died")]
PartitionWriterDied {
/// Topic name of the dead writer.
topic: String,
/// Partition index of the dead writer.
partition: i32,
},
/// The broker is shutting down and refuses new work.
#[error("shutting down")]
Shutdown,
/// A failure that occurred during [`crate::Broker::start`] — controller
/// bring-up, leader election timeout, etc.
#[error("startup failed: {0}")]
Startup(String),
/// A group-coordinator request arrived while the group is in a state
/// that doesn't allow it (e.g. heartbeat during `PreparingRebalance`).
#[error("group {group_id} is in state {state:?}, request not allowed")]
GroupInvalidState {
/// The affected group id.
group_id: String,
/// The current `GroupState` rendered via `Debug`.
state: String,
},
/// The client referenced a `member_id` the coordinator doesn't track
/// for this group.
#[error("unknown member {member_id} in group {group_id}")]
UnknownMember {
/// The affected group id.
group_id: String,
/// The unrecognized member id.
member_id: String,
},
/// The client sent a request bound to a stale generation.
#[error("group {group_id} generation mismatch: have {current}, got {requested}")]
GenerationMismatch {
/// The affected group id.
group_id: String,
/// The coordinator's current generation.
current: i32,
/// The generation the client supplied.
requested: i32,
},
/// The client's producer epoch is older than the current one registered
/// for this producer id.
#[error("producer epoch fenced: pid={producer_id} got {requested}, current {current}")]
ProducerEpochFenced {
/// The producer id that was fenced.
producer_id: i64,
/// The epoch currently registered for this producer id.
current: i16,
/// The epoch the client supplied.
requested: i16,
},
#[error("fenced leader epoch (have={have}, current={current})")]
FencedLeaderEpoch { have: i32, current: i32 },
#[error("unknown leader epoch ({0})")]
UnknownLeaderEpoch(i32),
/// A replication-layer failure (fetch from leader failed, truncation
/// error, etc.). Maps to `UNKNOWN_SERVER_ERROR` on the wire.
#[error("replication: {0}")]
Replication(String),
/// A transactional operation failed. Maps to `UNKNOWN_SERVER_ERROR` on
/// the wire; specific wire codes are chosen by handlers.
#[error("transaction: {0}")]
Txn(String),
/// A KIP-932 share-coordinator (persister) operation failed. Maps to
/// `UNKNOWN_SERVER_ERROR` on the wire; specific wire codes are chosen by
/// handlers.
#[error("share: {0}")]
Share(String),
/// Two listeners share the same `bind_addr`.
#[error("listener bind conflict: {a} and {b} share bind_addr")]
ListenerConflict { a: String, b: String },
/// `inter_broker_listener_name` does not match any listener name.
#[error("inter_broker_listener_name {name} not in listeners list")]
InvalidInterBrokerListener { name: String },
/// `process.roles` was empty — a node must be at least one of
/// `controller` / `broker`.
#[error("process.roles must list at least one role")]
EmptyRoles,
/// A non-controller node lists itself in `controller_quorum_voters`.
#[error("node {node_id} is not a controller but appears in its own controller_quorum_voters")]
NonControllerIsVoter { node_id: crabka_raft::NodeId },
/// A SASL listener is declared but `enabled_sasl_mechanisms` is empty.
#[error("SASL listener {name} declared but enabled_sasl_mechanisms is empty")]
SaslListenerNoMechanisms { name: String },
/// `Gssapi` is an enabled SASL mechanism but no `gssapi` config (keytab,
/// service name, principal mapping) was provided.
#[error("GSSAPI is an enabled SASL mechanism but gssapi config is missing")]
GssapiConfigMissing,
/// TLS configuration error.
#[error("tls: {0}")]
Tls(String),
/// Failed to read or decode the bootstrap records file written by
/// `crabka format --add-scram`.
#[error("bootstrap file {path:?}: {source}")]
BootstrapFile {
/// Path to the file that could not be read or decoded.
path: std::path::PathBuf,
/// Underlying I/O or decode error.
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("invalid leader_imbalance_check_interval_secs = {value}: must be >= 1")]
InvalidLeaderRebalanceInterval { value: u64 },
#[error("invalid leader_imbalance_per_broker_percentage = {value}: must be <= 100")]
InvalidLeaderRebalanceThreshold { value: u32 },
#[error("controlled shutdown did not complete within {0:?}")]
ShutdownTimeout(std::time::Duration),
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn display_unsupported_api() {
let e = BrokerError::UnsupportedApi {
api_key: 7,
version: 9,
};
assert!(e.to_string().contains("api_key=7"));
assert!(e.to_string().contains("version=9"));
}
}