Skip to main content

lnc_client/
error.rs

1use std::fmt;
2
3use std::net::SocketAddr;
4
5/// Errors that can occur during client operations
6#[derive(Debug)]
7pub enum ClientError {
8    /// Failed to establish a connection to the server
9    ConnectionFailed(std::io::Error),
10    /// Connection was closed by the server
11    ConnectionClosed,
12    /// I/O error during communication
13    IoError(std::io::Error),
14    /// Protocol-level error (malformed data, invalid state)
15    ProtocolError(String),
16    /// Received an unexpected or invalid response from the server
17    InvalidResponse(String),
18    /// Operation timed out
19    Timeout,
20    /// CRC checksum mismatch indicating data corruption
21    CrcMismatch {
22        /// Expected CRC value
23        expected: u32,
24        /// Actual CRC value received
25        actual: u32,
26    },
27    /// Server is applying backpressure, client should slow down
28    ServerBackpressure,
29    /// Server returned an error message
30    ServerError(String),
31    /// Server has not yet replicated to the requested offset — backoff and retry
32    ServerCatchingUp {
33        /// The server's current maximum offset
34        server_offset: u64,
35    },
36    /// Server is not the leader, redirect to the specified address
37    NotLeader {
38        /// Address of the current leader, if known
39        leader_addr: Option<SocketAddr>,
40    },
41    /// TLS handshake or configuration error
42    TlsError(String),
43}
44
45impl fmt::Display for ClientError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::ConnectionFailed(e) => write!(f, "Connection failed: {}", e),
49            Self::ConnectionClosed => write!(f, "Connection closed by server"),
50            Self::IoError(e) => write!(f, "I/O error: {}", e),
51            Self::ProtocolError(msg) => write!(f, "Protocol error: {}", msg),
52            Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
53            Self::Timeout => write!(f, "Operation timed out"),
54            Self::CrcMismatch { expected, actual } => {
55                write!(
56                    f,
57                    "CRC mismatch: expected {:#x}, got {:#x}",
58                    expected, actual
59                )
60            },
61            Self::ServerBackpressure => write!(f, "Server signaled backpressure"),
62            Self::ServerError(msg) => write!(f, "Server error: {}", msg),
63            Self::ServerCatchingUp { server_offset } => {
64                write!(f, "Server catching up (at offset {})", server_offset)
65            },
66            Self::NotLeader { leader_addr } => match leader_addr {
67                Some(addr) => write!(f, "Not leader, redirect to {}", addr),
68                None => write!(f, "Not leader, leader unknown"),
69            },
70            Self::TlsError(msg) => write!(f, "TLS error: {}", msg),
71        }
72    }
73}
74
75/// Parse a NOT_LEADER error message and extract the redirect address if present
76pub fn parse_not_leader_error(msg: &str) -> Option<Option<SocketAddr>> {
77    if !msg.starts_with("NOT_LEADER:") {
78        return None;
79    }
80
81    if msg.contains("leader unknown") {
82        return Some(None);
83    }
84
85    // Parse "NOT_LEADER: redirect to X.X.X.X:PORT"
86    if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") {
87        if let Ok(addr) = addr_str.trim().parse::<SocketAddr>() {
88            return Some(Some(addr));
89        }
90    }
91
92    Some(None)
93}
94
95impl ClientError {
96    /// Returns true if this error is transient and the operation should be retried
97    /// after reconnecting. Used by Producer and Consumer for automatic retry logic.
98    pub fn is_retryable(&self) -> bool {
99        match self {
100            // Connection-level failures — reconnect and retry
101            Self::ConnectionClosed | Self::ConnectionFailed(_) | Self::IoError(_) => true,
102            // Timeouts are transient — server might be busy during election
103            Self::Timeout => true,
104            // Backpressure — server wants us to slow down, retry after delay
105            Self::ServerBackpressure => true,
106            // NOT_LEADER — need to reconnect to a different node
107            Self::NotLeader { .. } => true,
108            // CATCHING_UP — server behind, backoff and retry
109            Self::ServerCatchingUp { .. } => true,
110            // Server errors containing FORWARD_FAILED — leader unknown/unreachable
111            // during election, retry after reconnect to potentially different node
112            Self::ServerError(msg) => msg.contains("FORWARD_FAILED"),
113            // Non-retryable: ProtocolError, InvalidResponse, CrcMismatch, TlsError
114            _ => false,
115        }
116    }
117}
118
119impl std::error::Error for ClientError {
120    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
121        match self {
122            Self::ConnectionFailed(e) | Self::IoError(e) => Some(e),
123            _ => None,
124        }
125    }
126}
127
128impl From<std::io::Error> for ClientError {
129    fn from(err: std::io::Error) -> Self {
130        Self::IoError(err)
131    }
132}
133
134impl From<lnc_core::LanceError> for ClientError {
135    fn from(err: lnc_core::LanceError) -> Self {
136        Self::ProtocolError(err.to_string())
137    }
138}
139
140pub type Result<T> = std::result::Result<T, ClientError>;