Skip to main content

drasi_mssql_common/
error.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Error types for MS SQL CDC source
16
17use std::fmt;
18
19/// Errors specific to MS SQL CDC operations
20#[derive(Debug)]
21pub enum MsSqlError {
22    /// Connection-related errors (network issues, authentication failures, etc.)
23    Connection(ConnectionError),
24
25    /// LSN-related errors (invalid LSN, out of range, etc.)
26    Lsn(LsnError),
27
28    /// Primary key errors (missing key, NULL values, etc.)
29    PrimaryKey(PrimaryKeyError),
30
31    /// SQL identifier validation errors
32    InvalidIdentifier(String),
33
34    /// Query execution errors
35    Query(String),
36
37    /// Configuration errors
38    Config(String),
39
40    /// Other errors that don't fit into specific categories
41    Other(String),
42}
43
44impl fmt::Display for MsSqlError {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            Self::Connection(e) => write!(f, "Connection error: {e}"),
48            Self::Lsn(e) => write!(f, "LSN error: {e}"),
49            Self::PrimaryKey(e) => write!(f, "Primary key error: {e}"),
50            Self::InvalidIdentifier(msg) => write!(f, "Invalid SQL identifier: {msg}"),
51            Self::Query(msg) => write!(f, "Query error: {msg}"),
52            Self::Config(msg) => write!(f, "Configuration error: {msg}"),
53            Self::Other(msg) => write!(f, "{msg}"),
54        }
55    }
56}
57
58impl std::error::Error for MsSqlError {}
59
60/// Connection-related error types
61#[derive(Debug)]
62pub enum ConnectionError {
63    /// Failed to establish initial connection
64    Failed(String),
65
66    /// Connection was lost/reset
67    Lost(String),
68
69    /// Connection timed out
70    Timeout(String),
71
72    /// Authentication failed
73    AuthenticationFailed(String),
74
75    /// Network unreachable
76    NetworkUnreachable(String),
77
78    /// Connection refused by server
79    Refused(String),
80
81    /// Too many consecutive errors indicating unhealthy connection
82    Unhealthy {
83        consecutive_errors: u32,
84        last_error: String,
85    },
86}
87
88impl fmt::Display for ConnectionError {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        match self {
91            Self::Failed(msg) => write!(f, "Failed to connect: {msg}"),
92            Self::Lost(msg) => write!(f, "Connection lost: {msg}"),
93            Self::Timeout(msg) => write!(f, "Connection timed out: {msg}"),
94            Self::AuthenticationFailed(msg) => write!(f, "Authentication failed: {msg}"),
95            Self::NetworkUnreachable(msg) => write!(f, "Network unreachable: {msg}"),
96            Self::Refused(msg) => write!(f, "Connection refused: {msg}"),
97            Self::Unhealthy {
98                consecutive_errors,
99                last_error,
100            } => {
101                write!(f, "Connection unhealthy after {consecutive_errors} consecutive errors: {last_error}")
102            }
103        }
104    }
105}
106
107/// LSN-related error types
108#[derive(Debug)]
109pub enum LsnError {
110    /// LSN is invalid (malformed or corrupted)
111    Invalid(String),
112
113    /// LSN is out of CDC retention range
114    OutOfRange(String),
115
116    /// LSN parsing failed
117    ParseFailed(String),
118
119    /// No LSN available from CDC
120    NotAvailable(String),
121}
122
123impl fmt::Display for LsnError {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Invalid(msg) => write!(f, "Invalid LSN: {msg}"),
127            Self::OutOfRange(msg) => write!(f, "LSN out of range: {msg}"),
128            Self::ParseFailed(msg) => write!(f, "Failed to parse LSN: {msg}"),
129            Self::NotAvailable(msg) => write!(f, "LSN not available: {msg}"),
130        }
131    }
132}
133
134/// Primary key related error types
135#[derive(Debug)]
136pub enum PrimaryKeyError {
137    /// No primary key configured for table
138    NotConfigured { table: String },
139
140    /// Primary key column not found in row
141    ColumnNotFound { table: String, column: String },
142
143    /// All primary key values are NULL
144    AllNull { table: String, columns: Vec<String> },
145}
146
147impl fmt::Display for PrimaryKeyError {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        match self {
150            Self::NotConfigured { table } => {
151                write!(
152                    f,
153                    "No primary key configured for table '{table}'. \
154                     Add a 'table_keys' configuration entry to specify the primary key columns."
155                )
156            }
157            Self::ColumnNotFound { table, column } => {
158                write!(
159                    f,
160                    "Primary key column '{column}' not found in row for table '{table}'. \
161                     Check that the column name in 'table_keys' matches the actual column name."
162                )
163            }
164            Self::AllNull { table, columns } => {
165                write!(
166                    f,
167                    "All primary key values are NULL for table '{table}' (columns: {columns:?}). \
168                     Cannot generate a stable element ID."
169                )
170            }
171        }
172    }
173}
174
175impl MsSqlError {
176    /// Check if this error is a connection-related error
177    pub fn is_connection_error(&self) -> bool {
178        matches!(self, Self::Connection(_))
179    }
180
181    /// Check if this error is an LSN-related error that can be recovered by resetting the checkpoint
182    pub fn is_recoverable_lsn_error(&self) -> bool {
183        matches!(
184            self,
185            Self::Lsn(LsnError::Invalid(_) | LsnError::OutOfRange(_))
186        )
187    }
188
189    /// Create a connection error from an underlying error message
190    pub fn from_connection_error(error: impl ToString) -> Self {
191        let error_str = error.to_string().to_lowercase();
192
193        if error_str.contains("timed out") || error_str.contains("timeout") {
194            Self::Connection(ConnectionError::Timeout(error.to_string()))
195        } else if error_str.contains("refused") {
196            Self::Connection(ConnectionError::Refused(error.to_string()))
197        } else if error_str.contains("unreachable") {
198            Self::Connection(ConnectionError::NetworkUnreachable(error.to_string()))
199        } else if error_str.contains("authentication") || error_str.contains("login") {
200            Self::Connection(ConnectionError::AuthenticationFailed(error.to_string()))
201        } else if error_str.contains("reset")
202            || error_str.contains("broken pipe")
203            || error_str.contains("closed")
204            || error_str.contains("eof")
205        {
206            Self::Connection(ConnectionError::Lost(error.to_string()))
207        } else {
208            Self::Connection(ConnectionError::Failed(error.to_string()))
209        }
210    }
211
212    /// Attempt to classify an anyhow::Error as an MsSqlError
213    ///
214    /// This checks if the error is already an MsSqlError (via downcast),
215    /// or attempts to classify it based on error message patterns.
216    pub fn classify(error: &anyhow::Error) -> Option<MsSqlErrorKind> {
217        // Try to downcast to MsSqlError first
218        if let Some(mssql_err) = error.downcast_ref::<MsSqlError>() {
219            return Some(match mssql_err {
220                MsSqlError::Connection(_) => MsSqlErrorKind::Connection,
221                MsSqlError::Lsn(LsnError::Invalid(_) | LsnError::OutOfRange(_)) => {
222                    MsSqlErrorKind::RecoverableLsn
223                }
224                MsSqlError::Lsn(_) => MsSqlErrorKind::Other,
225                MsSqlError::PrimaryKey(_) => MsSqlErrorKind::Other,
226                MsSqlError::InvalidIdentifier(_) => MsSqlErrorKind::Other,
227                MsSqlError::Query(_) => MsSqlErrorKind::Other,
228                MsSqlError::Config(_) => MsSqlErrorKind::Other,
229                MsSqlError::Other(_) => MsSqlErrorKind::Other,
230            });
231        }
232
233        // Fall back to string-based classification for errors from external libraries
234        let error_str = error.to_string().to_lowercase();
235
236        // Check for connection errors
237        if error_str.contains("connection")
238            || error_str.contains("broken pipe")
239            || error_str.contains("reset by peer")
240            || error_str.contains("timed out")
241            || error_str.contains("network")
242            || error_str.contains("socket")
243            || error_str.contains("eof")
244            || error_str.contains("closed")
245            || error_str.contains("refused")
246            || error_str.contains("unreachable")
247        {
248            return Some(MsSqlErrorKind::Connection);
249        }
250
251        // Check for LSN errors (from SQL Server error messages)
252        if error_str.contains("lsn")
253            && (error_str.contains("invalid") || error_str.contains("out of range"))
254        {
255            return Some(MsSqlErrorKind::RecoverableLsn);
256        }
257
258        None
259    }
260}
261
262/// Simplified error classification for control flow decisions
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum MsSqlErrorKind {
265    /// Connection-related error requiring reconnection
266    Connection,
267
268    /// LSN error that can be recovered by resetting the checkpoint
269    RecoverableLsn,
270
271    /// Other errors
272    Other,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_connection_error_display() {
281        let err = MsSqlError::Connection(ConnectionError::Lost("connection reset".to_string()));
282        assert!(err.to_string().contains("Connection lost"));
283        assert!(err.is_connection_error());
284    }
285
286    #[test]
287    fn test_lsn_error_display() {
288        let err = MsSqlError::Lsn(LsnError::OutOfRange("LSN too old".to_string()));
289        assert!(err.to_string().contains("out of range"));
290        assert!(err.is_recoverable_lsn_error());
291    }
292
293    #[test]
294    fn test_primary_key_error_display() {
295        let err = MsSqlError::PrimaryKey(PrimaryKeyError::NotConfigured {
296            table: "orders".to_string(),
297        });
298        assert!(err.to_string().contains("No primary key configured"));
299        assert!(err.to_string().contains("orders"));
300    }
301
302    #[test]
303    fn test_classify_connection_error() {
304        let err = anyhow::anyhow!("connection reset by peer");
305        assert_eq!(MsSqlError::classify(&err), Some(MsSqlErrorKind::Connection));
306
307        let err = anyhow::anyhow!("broken pipe");
308        assert_eq!(MsSqlError::classify(&err), Some(MsSqlErrorKind::Connection));
309
310        let err = anyhow::anyhow!("network unreachable");
311        assert_eq!(MsSqlError::classify(&err), Some(MsSqlErrorKind::Connection));
312    }
313
314    #[test]
315    fn test_classify_lsn_error() {
316        let err = anyhow::anyhow!("The specified LSN is invalid or out of range");
317        assert_eq!(
318            MsSqlError::classify(&err),
319            Some(MsSqlErrorKind::RecoverableLsn)
320        );
321    }
322
323    #[test]
324    fn test_classify_unknown_error() {
325        let err = anyhow::anyhow!("some random error");
326        assert_eq!(MsSqlError::classify(&err), None);
327    }
328
329    #[test]
330    fn test_from_connection_error() {
331        let err = MsSqlError::from_connection_error("connection timed out");
332        assert!(matches!(
333            err,
334            MsSqlError::Connection(ConnectionError::Timeout(_))
335        ));
336
337        let err = MsSqlError::from_connection_error("connection refused");
338        assert!(matches!(
339            err,
340            MsSqlError::Connection(ConnectionError::Refused(_))
341        ));
342
343        let err = MsSqlError::from_connection_error("broken pipe");
344        assert!(matches!(
345            err,
346            MsSqlError::Connection(ConnectionError::Lost(_))
347        ));
348    }
349}