crabka_client_consumer/
error.rs1use thiserror::Error;
4
5#[derive(Debug, Error)]
7#[non_exhaustive]
8pub enum ConsumerError {
9 #[error("client: {0}")]
10 Client(#[from] crabka_client_core::ClientError),
11
12 #[error("protocol: {0}")]
13 Protocol(#[from] crabka_protocol::ProtocolError),
14
15 #[error("rebalance failed: {0}")]
16 RebalanceFailed(String),
17
18 #[error("not subscribed to any topic")]
19 NotSubscribed,
20
21 #[error("illegal state: {0}")]
22 IllegalState(String),
23
24 #[error("commit conflict: rejoined since this poll")]
25 CommitInvalid,
26
27 #[error("coordinator unavailable")]
28 CoordinatorUnavailable,
29
30 #[error(
31 "log truncation detected on {topic}-{partition}: fetch offset {fetch_offset} is past the leader's log; safe offset {safe_offset}"
32 )]
33 LogTruncation {
34 topic: String,
35 partition: i32,
36 fetch_offset: i64,
37 safe_offset: i64,
38 },
39
40 #[error("broker error_code {0}")]
41 Server(i16),
42}
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use assert2::assert;
48
49 #[test]
50 fn display_not_subscribed() {
51 let e = ConsumerError::NotSubscribed;
52 assert!(e.to_string().contains("not subscribed"));
53 }
54
55 #[test]
56 fn display_server_error_code() {
57 let e = ConsumerError::Server(25);
58 assert!(e.to_string().contains("25"));
59 }
60
61 #[test]
62 fn display_log_truncation() {
63 let e = ConsumerError::LogTruncation {
64 topic: "t".into(),
65 partition: 3,
66 fetch_offset: 100,
67 safe_offset: 42,
68 };
69 let s = e.to_string();
70 assert!(s.contains("truncation"));
71 assert!(s.contains("42"));
72 }
73}