Skip to main content

lnc_client/
error.rs

1use std::fmt;
2
3use std::net::SocketAddr;
4
5/// Errors that can occur during client operations
6#[derive(Debug)]
7pub enum ClientError {
8    /// Failed to establish a connection to the server
9    ConnectionFailed(std::io::Error),
10    /// Connection was closed by the server
11    ConnectionClosed,
12    /// I/O error during communication
13    IoError(std::io::Error),
14    /// Protocol-level error (malformed data, invalid state)
15    ProtocolError(String),
16    /// Received an unexpected or invalid response from the server
17    InvalidResponse(String),
18    /// Operation timed out
19    Timeout,
20    /// CRC checksum mismatch indicating data corruption
21    CrcMismatch {
22        /// Expected CRC value
23        expected: u32,
24        /// Actual CRC value received
25        actual: u32,
26    },
27    /// Server is applying backpressure, client should slow down
28    ServerBackpressure,
29    /// M3: Operation would block (non-blocking mode), client buffer is full
30    WouldBlock,
31    /// Server returned an error message
32    ServerError(String),
33    /// Server has not yet replicated to the requested offset — backoff and retry
34    ServerCatchingUp {
35        /// The server's current maximum offset
36        server_offset: u64,
37    },
38    /// Server is not the leader, redirect to the specified address
39    NotLeader {
40        /// Address of the current leader, if known
41        leader_addr: Option<SocketAddr>,
42    },
43    /// TLS handshake or configuration error
44    TlsError(String),
45}
46
47impl fmt::Display for ClientError {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Self::ConnectionFailed(e) => write!(f, "Connection failed: {}", e),
51            Self::ConnectionClosed => write!(f, "Connection closed by server"),
52            Self::IoError(e) => write!(f, "I/O error: {}", e),
53            Self::ProtocolError(msg) => write!(f, "Protocol error: {}", msg),
54            Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
55            Self::Timeout => write!(f, "Operation timed out"),
56            Self::CrcMismatch { expected, actual } => {
57                write!(
58                    f,
59                    "CRC mismatch: expected {:#x}, got {:#x}",
60                    expected, actual
61                )
62            },
63            Self::ServerBackpressure => write!(f, "Server signaled backpressure"),
64            Self::WouldBlock => write!(f, "Operation would block (buffer full)"),
65            Self::ServerError(msg) => write!(f, "Server error: {}", msg),
66            Self::ServerCatchingUp { server_offset } => {
67                write!(f, "Server catching up (at offset {})", server_offset)
68            },
69            Self::NotLeader { leader_addr } => match leader_addr {
70                Some(addr) => write!(f, "Not leader, redirect to {}", addr),
71                None => write!(f, "Not leader, leader unknown"),
72            },
73            Self::TlsError(msg) => write!(f, "TLS error: {}", msg),
74        }
75    }
76}
77
78/// Parse a NOT_LEADER error message and extract the redirect address if present
79pub fn parse_not_leader_error(msg: &str) -> Option<Option<SocketAddr>> {
80    if !msg.starts_with("NOT_LEADER:") {
81        return None;
82    }
83
84    if msg.contains("leader unknown") {
85        return Some(None);
86    }
87
88    // Parse "NOT_LEADER: redirect to X.X.X.X:PORT"
89    if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") {
90        if let Ok(addr) = addr_str.trim().parse::<SocketAddr>() {
91            return Some(Some(addr));
92        }
93    }
94
95    Some(None)
96}
97
98impl ClientError {
99    /// Returns true if this error is transient and the operation should be retried
100    /// after reconnecting. Used by Producer and Consumer for automatic retry logic.
101    pub fn is_retryable(&self) -> bool {
102        match self {
103            // Connection-level failures — reconnect and retry
104            Self::ConnectionClosed | Self::ConnectionFailed(_) | Self::IoError(_) => true,
105            // Timeouts are transient — server might be busy during election
106            Self::Timeout => true,
107            // Backpressure — server wants us to slow down, retry after delay
108            Self::ServerBackpressure => true,
109            // NOT_LEADER — need to reconnect to a different node
110            Self::NotLeader { .. } => true,
111            // CATCHING_UP — server behind, backoff and retry
112            Self::ServerCatchingUp { .. } => true,
113            // Server errors containing FORWARD_FAILED — leader unknown/unreachable
114            // during election, retry after reconnect to potentially different node
115            Self::ServerError(msg) => msg.contains("FORWARD_FAILED"),
116            // Non-retryable: ProtocolError, InvalidResponse, CrcMismatch, TlsError
117            _ => false,
118        }
119    }
120}
121
122impl std::error::Error for ClientError {
123    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
124        match self {
125            Self::ConnectionFailed(e) | Self::IoError(e) => Some(e),
126            _ => None,
127        }
128    }
129}
130
131impl From<std::io::Error> for ClientError {
132    fn from(err: std::io::Error) -> Self {
133        Self::IoError(err)
134    }
135}
136
137impl From<lnc_core::LanceError> for ClientError {
138    fn from(err: lnc_core::LanceError) -> Self {
139        Self::ProtocolError(err.to_string())
140    }
141}
142
143pub type Result<T> = std::result::Result<T, ClientError>;