replication_engine/
error.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Error types for the replication engine.
5//!
6//! This module defines the error types used throughout the replication engine.
7//! Errors are categorized by their source (Redis, SQLite, etc.) and include
8//! context to help with debugging.
9//!
10//! # Error Categories
11//!
12//! | Error Type | Retryable | Description |
13//! |------------|-----------|-------------|
14//! | `Redis` | Yes | Network errors, timeouts, connection failures |
15//! | `PeerConnection` | Yes | Peer unreachable, connection dropped |
16//! | `SyncEngine` | Yes | Sync engine temporarily unavailable |
17//! | `CursorStore` | No | Local SQLite errors (needs operator attention) |
18//! | `Config` | No | Configuration invalid |
19//! | `Decompression` | No | Data corruption (zstd decode failed) |
20//! | `StreamParse` | No | Malformed CDC event |
21//! | `InvalidState` | No | Engine state machine violation |
22//! | `Shutdown` | No | Engine is shutting down |
23//! | `Internal` | No | Unexpected internal error |
24//!
25//! # Retry Behavior
26//!
27//! Use [`ReplicationError::is_retryable()`] to determine if an operation
28//! should be retried with backoff. Retryable errors indicate transient
29//! network or availability issues. Non-retryable errors indicate bugs,
30//! configuration problems, or data corruption.
31
32use thiserror::Error;
33
34/// Result type alias for replication operations.
35pub type Result<T> = std::result::Result<T, ReplicationError>;
36
37/// Errors that can occur during replication.
38///
39/// Each variant includes context about where the error occurred.
40/// Use [`is_retryable()`](Self::is_retryable) to check if the operation
41/// should be retried.
42#[derive(Error, Debug)]
43pub enum ReplicationError {
44    /// Redis connection or command error.
45    ///
46    /// Occurs when communicating with a peer's Redis instance.
47    /// These are typically retryable (network timeouts, connection drops).
48    #[error("Redis error ({operation}): {message}")]
49    Redis {
50        operation: String,
51        message: String,
52        #[source]
53        source: Option<redis::RedisError>,
54    },
55
56    /// SQLite error during cursor persistence.
57    ///
58    /// Occurs when reading/writing cursor positions to SQLite.
59    /// Not retryable - indicates local database issues that need attention.
60    #[error("Cursor store error: {0}")]
61    CursorStore(#[from] sqlx::Error),
62
63    /// Invalid or missing configuration.
64    ///
65    /// Occurs during engine initialization if config is malformed.
66    /// Not retryable - fix the configuration and restart.
67    #[error("Configuration error: {0}")]
68    Config(String),
69
70    /// Peer connection failure.
71    ///
72    /// Occurs when a peer is unreachable or the connection drops.
73    /// Retryable with exponential backoff.
74    #[error("Peer connection error ({peer_id}): {message}")]
75    PeerConnection { peer_id: String, message: String },
76
77    /// Zstd decompression failure.
78    ///
79    /// Occurs when CDC event payload is corrupted or truncated.
80    /// Not retryable - the data is corrupt at the source.
81    #[error("Decompression error: {0}")]
82    Decompression(String),
83
84    /// CDC stream event parsing failure.
85    ///
86    /// Occurs when a Redis stream entry has unexpected format.
87    /// Not retryable - the event is malformed at the source.
88    #[error("Stream parse error: {0}")]
89    StreamParse(String),
90
91    /// Sync engine communication failure.
92    ///
93    /// Occurs when submitting to the local sync engine fails.
94    /// Retryable - sync engine may be temporarily overloaded.
95    #[error("Sync engine error: {0}")]
96    SyncEngine(String),
97
98    /// Engine state machine violation.
99    ///
100    /// Occurs when an operation is attempted in the wrong state
101    /// (e.g., calling `start()` on an already-running engine).
102    /// Not retryable - indicates a bug in the caller.
103    #[error("Invalid state: expected {expected}, got {actual}")]
104    InvalidState { expected: String, actual: String },
105
106    /// Shutdown in progress.
107    ///
108    /// Returned when operations are attempted during shutdown.
109    /// Not retryable - engine is terminating.
110    #[error("Shutdown in progress")]
111    Shutdown,
112
113    /// Unexpected internal error.
114    ///
115    /// Catch-all for errors that shouldn't happen.
116    /// Not retryable - indicates a bug that needs investigation.
117    #[error("Internal error: {0}")]
118    Internal(String),
119}
120
121impl ReplicationError {
122    /// Create a Redis error from a redis::RedisError
123    pub fn redis(operation: impl Into<String>, source: redis::RedisError) -> Self {
124        Self::Redis {
125            operation: operation.into(),
126            message: source.to_string(),
127            source: Some(source),
128        }
129    }
130
131    /// Create a Redis error without source
132    pub fn redis_msg(operation: impl Into<String>, message: impl Into<String>) -> Self {
133        Self::Redis {
134            operation: operation.into(),
135            message: message.into(),
136            source: None,
137        }
138    }
139
140    /// Check if this error is retryable
141    pub fn is_retryable(&self) -> bool {
142        match self {
143            Self::Redis { .. } => true, // Network errors are retryable
144            Self::PeerConnection { .. } => true,
145            Self::SyncEngine(_) => true,
146            Self::CursorStore(_) => false, // Local DB issues need attention
147            Self::Config(_) => false,
148            Self::Decompression(_) => false, // Data corruption
149            Self::StreamParse(_) => false,   // Data corruption
150            Self::InvalidState { .. } => false,
151            Self::Shutdown => false,
152            Self::Internal(_) => false,
153        }
154    }
155}
156
157impl From<redis::RedisError> for ReplicationError {
158    fn from(e: redis::RedisError) -> Self {
159        Self::redis("unknown", e)
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[test]
168    fn test_is_retryable_redis() {
169        let err = ReplicationError::redis_msg("XREAD", "connection reset");
170        assert!(err.is_retryable());
171        assert!(err.to_string().contains("XREAD"));
172    }
173
174    #[test]
175    fn test_is_retryable_peer_connection() {
176        let err = ReplicationError::PeerConnection {
177            peer_id: "peer-1".to_string(),
178            message: "connection refused".to_string(),
179        };
180        assert!(err.is_retryable());
181        assert!(err.to_string().contains("peer-1"));
182    }
183
184    #[test]
185    fn test_is_retryable_sync_engine() {
186        let err = ReplicationError::SyncEngine("overloaded".to_string());
187        assert!(err.is_retryable());
188    }
189
190    #[test]
191    fn test_not_retryable_config() {
192        let err = ReplicationError::Config("invalid peer URL".to_string());
193        assert!(!err.is_retryable());
194    }
195
196    #[test]
197    fn test_not_retryable_decompression() {
198        let err = ReplicationError::Decompression("invalid zstd header".to_string());
199        assert!(!err.is_retryable());
200    }
201
202    #[test]
203    fn test_not_retryable_stream_parse() {
204        let err = ReplicationError::StreamParse("missing op field".to_string());
205        assert!(!err.is_retryable());
206    }
207
208    #[test]
209    fn test_not_retryable_invalid_state() {
210        let err = ReplicationError::InvalidState {
211            expected: "Running".to_string(),
212            actual: "Stopped".to_string(),
213        };
214        assert!(!err.is_retryable());
215        assert!(err.to_string().contains("Running"));
216        assert!(err.to_string().contains("Stopped"));
217    }
218
219    #[test]
220    fn test_not_retryable_shutdown() {
221        let err = ReplicationError::Shutdown;
222        assert!(!err.is_retryable());
223    }
224
225    #[test]
226    fn test_not_retryable_internal() {
227        let err = ReplicationError::Internal("unexpected panic".to_string());
228        assert!(!err.is_retryable());
229    }
230
231    #[test]
232    fn test_redis_error_formatting() {
233        let err = ReplicationError::Redis {
234            operation: "XRANGE".to_string(),
235            message: "timeout".to_string(),
236            source: None,
237        };
238        let msg = err.to_string();
239        assert!(msg.contains("Redis error"));
240        assert!(msg.contains("XRANGE"));
241        assert!(msg.contains("timeout"));
242    }
243}