Skip to main content

lnc_client/
error.rs

1use std::fmt;
2
3use std::net::SocketAddr;
4
5/// Errors that can occur during client operations
6#[derive(Debug)]
7pub enum ClientError {
8    /// Failed to establish a connection to the server
9    ConnectionFailed(std::io::Error),
10    /// Connection was closed by the server
11    ConnectionClosed,
12    /// I/O error during communication
13    IoError(std::io::Error),
14    /// Protocol-level error (malformed data, invalid state)
15    ProtocolError(String),
16    /// Received an unexpected or invalid response from the server
17    InvalidResponse(String),
18    /// Operation timed out
19    Timeout,
20    /// CRC checksum mismatch indicating data corruption
21    CrcMismatch {
22        /// Expected CRC value
23        expected: u32,
24        /// Actual CRC value received
25        actual: u32,
26    },
27    /// Server is applying backpressure, client should slow down
28    ServerBackpressure,
29    /// M3: Operation would block (non-blocking mode), client buffer is full
30    WouldBlock,
31    /// Server returned an error message
32    ServerError(String),
33    /// Server has not yet replicated to the requested offset — backoff and retry
34    ServerCatchingUp {
35        /// The server's current maximum offset
36        server_offset: u64,
37    },
38    /// Server is not the leader, redirect to the specified address
39    NotLeader {
40        /// Address of the current leader, if known
41        leader_addr: Option<SocketAddr>,
42    },
43    /// TLS handshake or configuration error
44    TlsError(String),
45    /// Topic name failed validation — must match `[a-zA-Z0-9-]+`
46    InvalidTopicName(String),
47}
48
49impl fmt::Display for ClientError {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        match self {
52            Self::ConnectionFailed(e) => write!(f, "Connection failed: {}", e),
53            Self::ConnectionClosed => write!(f, "Connection closed by server"),
54            Self::IoError(e) => write!(f, "I/O error: {}", e),
55            Self::ProtocolError(msg) => write!(f, "Protocol error: {}", msg),
56            Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
57            Self::Timeout => write!(f, "Operation timed out"),
58            Self::CrcMismatch { expected, actual } => {
59                write!(
60                    f,
61                    "CRC mismatch: expected {:#x}, got {:#x}",
62                    expected, actual
63                )
64            },
65            Self::ServerBackpressure => write!(f, "Server signaled backpressure"),
66            Self::WouldBlock => write!(f, "Operation would block (buffer full)"),
67            Self::ServerError(msg) => write!(f, "Server error: {}", msg),
68            Self::ServerCatchingUp { server_offset } => {
69                write!(f, "Server catching up (at offset {})", server_offset)
70            },
71            Self::NotLeader { leader_addr } => match leader_addr {
72                Some(addr) => write!(f, "Not leader, redirect to {}", addr),
73                None => write!(f, "Not leader, leader unknown"),
74            },
75            Self::TlsError(msg) => write!(f, "TLS error: {}", msg),
76            Self::InvalidTopicName(name) => {
77                write!(f, "Invalid topic name {:?}: must match [a-zA-Z0-9-]+", name)
78            },
79        }
80    }
81}
82
83/// Parse a NOT_LEADER error message and extract the redirect address if present
84pub fn parse_not_leader_error(msg: &str) -> Option<Option<SocketAddr>> {
85    if !msg.starts_with("NOT_LEADER:") {
86        return None;
87    }
88
89    if msg.contains("leader unknown") {
90        return Some(None);
91    }
92
93    // Parse "NOT_LEADER: redirect to X.X.X.X:PORT"
94    if let Some(addr_str) = msg.strip_prefix("NOT_LEADER: redirect to ") {
95        if let Ok(addr) = addr_str.trim().parse::<SocketAddr>() {
96            return Some(Some(addr));
97        }
98    }
99
100    Some(None)
101}
102
103impl ClientError {
104    /// Returns true if this error is transient and the operation should be retried
105    /// after reconnecting. Used by Producer and Consumer for automatic retry logic.
106    pub fn is_retryable(&self) -> bool {
107        match self {
108            // Connection-level failures — reconnect and retry
109            Self::ConnectionClosed | Self::ConnectionFailed(_) | Self::IoError(_) => true,
110            // Timeouts are transient — server might be busy during election
111            Self::Timeout => true,
112            // Backpressure — server wants us to slow down, retry after delay
113            Self::ServerBackpressure => true,
114            // NOT_LEADER — need to reconnect to a different node
115            Self::NotLeader { .. } => true,
116            // CATCHING_UP — server behind, backoff and retry
117            Self::ServerCatchingUp { .. } => true,
118            // Server errors containing FORWARD_FAILED — leader unknown/unreachable
119            // during election, retry after reconnect to potentially different node
120            Self::ServerError(msg) => msg.contains("FORWARD_FAILED"),
121            // Non-retryable: ProtocolError, InvalidResponse, CrcMismatch, TlsError,
122            // InvalidTopicName (client-side validation failure)
123            _ => false,
124        }
125    }
126}
127
128/// Validate that a topic name contains only `[a-zA-Z0-9-]` characters.
129///
130/// LANCE topic names are restricted to alphanumeric characters and hyphens so
131/// that they can be safely embedded in file paths and URL segments on all
132/// platforms supported by the server.
133///
134/// # Errors
135///
136/// Returns [`ClientError::InvalidTopicName`] when `name` is empty or contains
137/// any character outside `[a-zA-Z0-9-]`.
138///
139/// # Examples
140///
141/// ```rust
142/// use lnc_client::{ClientError, validate_topic_name};
143///
144/// assert!(validate_topic_name("my-topic").is_ok());
145/// assert!(validate_topic_name("rithmic-actions-v2").is_ok());
146/// assert!(validate_topic_name("topic123").is_ok());
147/// assert!(matches!(
148///     validate_topic_name("bad topic!"),
149///     Err(ClientError::InvalidTopicName(_))
150/// ));
151/// assert!(matches!(
152///     validate_topic_name(""),
153///     Err(ClientError::InvalidTopicName(_))
154/// ));
155/// ```
156pub fn validate_topic_name(name: &str) -> Result<()> {
157    if name.is_empty() || !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') {
158        return Err(ClientError::InvalidTopicName(name.to_owned()));
159    }
160    Ok(())
161}
162
163impl std::error::Error for ClientError {
164    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
165        match self {
166            Self::ConnectionFailed(e) | Self::IoError(e) => Some(e),
167            _ => None,
168        }
169    }
170}
171
172impl From<std::io::Error> for ClientError {
173    fn from(err: std::io::Error) -> Self {
174        Self::IoError(err)
175    }
176}
177
178impl From<lnc_core::LanceError> for ClientError {
179    fn from(err: lnc_core::LanceError) -> Self {
180        Self::ProtocolError(err.to_string())
181    }
182}
183
184pub type Result<T> = std::result::Result<T, ClientError>;