Skip to main content

crabka_client_consumer/
error.rs

1//! Error type for `crabka-client-consumer`.
2
3use thiserror::Error;
4
5/// Errors returned by [`Consumer`](crate::Consumer).
6#[derive(Debug, Error)]
7#[non_exhaustive]
8pub enum ConsumerError {
9    #[error("client: {0}")]
10    Client(#[from] crabka_client_core::ClientError),
11
12    #[error("protocol: {0}")]
13    Protocol(#[from] crabka_protocol::ProtocolError),
14
15    #[error("rebalance failed: {0}")]
16    RebalanceFailed(String),
17
18    #[error("not subscribed to any topic")]
19    NotSubscribed,
20
21    #[error("illegal state: {0}")]
22    IllegalState(String),
23
24    #[error("commit conflict: rejoined since this poll")]
25    CommitInvalid,
26
27    #[error("coordinator unavailable")]
28    CoordinatorUnavailable,
29
30    #[error(
31        "log truncation detected on {topic}-{partition}: fetch offset {fetch_offset} is past the leader's log; safe offset {safe_offset}"
32    )]
33    LogTruncation {
34        topic: String,
35        partition: i32,
36        fetch_offset: i64,
37        safe_offset: i64,
38    },
39
40    #[error("broker error_code {0}")]
41    Server(i16),
42}
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use assert2::assert;
48
49    #[test]
50    fn display_not_subscribed() {
51        let e = ConsumerError::NotSubscribed;
52        assert!(e.to_string().contains("not subscribed"));
53    }
54
55    #[test]
56    fn display_server_error_code() {
57        let e = ConsumerError::Server(25);
58        assert!(e.to_string().contains("25"));
59    }
60
61    #[test]
62    fn display_log_truncation() {
63        let e = ConsumerError::LogTruncation {
64            topic: "t".into(),
65            partition: 3,
66            fetch_offset: 100,
67            safe_offset: 42,
68        };
69        let s = e.to_string();
70        assert!(s.contains("truncation"));
71        assert!(s.contains("42"));
72    }
73}