replication_engine/error.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Error types for the replication engine.
5//!
6//! This module defines the error types used throughout the replication engine.
7//! Errors are categorized by their source (Redis, SQLite, etc.) and include
8//! context to help with debugging.
9//!
10//! # Error Categories
11//!
12//! | Error Type | Retryable | Description |
13//! |------------|-----------|-------------|
14//! | `Redis` | Yes | Network errors, timeouts, connection failures |
15//! | `PeerConnection` | Yes | Peer unreachable, connection dropped |
16//! | `SyncEngine` | Yes | Sync engine temporarily unavailable |
17//! | `CursorStore` | No | Local SQLite errors (needs operator attention) |
18//! | `Config` | No | Configuration invalid |
19//! | `Decompression` | No | Data corruption (zstd decode failed) |
20//! | `StreamParse` | No | Malformed CDC event |
21//! | `InvalidState` | No | Engine state machine violation |
22//! | `Shutdown` | No | Engine is shutting down |
23//! | `Internal` | No | Unexpected internal error |
24//!
25//! # Retry Behavior
26//!
27//! Use [`ReplicationError::is_retryable()`] to determine if an operation
28//! should be retried with backoff. Retryable errors indicate transient
29//! network or availability issues. Non-retryable errors indicate bugs,
30//! configuration problems, or data corruption.
31
32use thiserror::Error;
33
34/// Result type alias for replication operations.
35pub type Result<T> = std::result::Result<T, ReplicationError>;
36
37/// Errors that can occur during replication.
38///
39/// Each variant includes context about where the error occurred.
40/// Use [`is_retryable()`](Self::is_retryable) to check if the operation
41/// should be retried.
42#[derive(Error, Debug)]
43pub enum ReplicationError {
44 /// Redis connection or command error.
45 ///
46 /// Occurs when communicating with a peer's Redis instance.
47 /// These are typically retryable (network timeouts, connection drops).
48 #[error("Redis error ({operation}): {message}")]
49 Redis {
50 operation: String,
51 message: String,
52 #[source]
53 source: Option<redis::RedisError>,
54 },
55
56 /// SQLite error during cursor persistence.
57 ///
58 /// Occurs when reading/writing cursor positions to SQLite.
59 /// Not retryable - indicates local database issues that need attention.
60 #[error("Cursor store error: {0}")]
61 CursorStore(#[from] sqlx::Error),
62
63 /// Invalid or missing configuration.
64 ///
65 /// Occurs during engine initialization if config is malformed.
66 /// Not retryable - fix the configuration and restart.
67 #[error("Configuration error: {0}")]
68 Config(String),
69
70 /// Peer connection failure.
71 ///
72 /// Occurs when a peer is unreachable or the connection drops.
73 /// Retryable with exponential backoff.
74 #[error("Peer connection error ({peer_id}): {message}")]
75 PeerConnection { peer_id: String, message: String },
76
77 /// Zstd decompression failure.
78 ///
79 /// Occurs when CDC event payload is corrupted or truncated.
80 /// Not retryable - the data is corrupt at the source.
81 #[error("Decompression error: {0}")]
82 Decompression(String),
83
84 /// CDC stream event parsing failure.
85 ///
86 /// Occurs when a Redis stream entry has unexpected format.
87 /// Not retryable - the event is malformed at the source.
88 #[error("Stream parse error: {0}")]
89 StreamParse(String),
90
91 /// Sync engine communication failure.
92 ///
93 /// Occurs when submitting to the local sync engine fails.
94 /// Retryable - sync engine may be temporarily overloaded.
95 #[error("Sync engine error: {0}")]
96 SyncEngine(String),
97
98 /// Engine state machine violation.
99 ///
100 /// Occurs when an operation is attempted in the wrong state
101 /// (e.g., calling `start()` on an already-running engine).
102 /// Not retryable - indicates a bug in the caller.
103 #[error("Invalid state: expected {expected}, got {actual}")]
104 InvalidState { expected: String, actual: String },
105
106 /// Shutdown in progress.
107 ///
108 /// Returned when operations are attempted during shutdown.
109 /// Not retryable - engine is terminating.
110 #[error("Shutdown in progress")]
111 Shutdown,
112
113 /// Unexpected internal error.
114 ///
115 /// Catch-all for errors that shouldn't happen.
116 /// Not retryable - indicates a bug that needs investigation.
117 #[error("Internal error: {0}")]
118 Internal(String),
119}
120
121impl ReplicationError {
122 /// Create a Redis error from a redis::RedisError
123 pub fn redis(operation: impl Into<String>, source: redis::RedisError) -> Self {
124 Self::Redis {
125 operation: operation.into(),
126 message: source.to_string(),
127 source: Some(source),
128 }
129 }
130
131 /// Create a Redis error without source
132 pub fn redis_msg(operation: impl Into<String>, message: impl Into<String>) -> Self {
133 Self::Redis {
134 operation: operation.into(),
135 message: message.into(),
136 source: None,
137 }
138 }
139
140 /// Check if this error is retryable
141 pub fn is_retryable(&self) -> bool {
142 match self {
143 Self::Redis { .. } => true, // Network errors are retryable
144 Self::PeerConnection { .. } => true,
145 Self::SyncEngine(_) => true,
146 Self::CursorStore(_) => false, // Local DB issues need attention
147 Self::Config(_) => false,
148 Self::Decompression(_) => false, // Data corruption
149 Self::StreamParse(_) => false, // Data corruption
150 Self::InvalidState { .. } => false,
151 Self::Shutdown => false,
152 Self::Internal(_) => false,
153 }
154 }
155}
156
157impl From<redis::RedisError> for ReplicationError {
158 fn from(e: redis::RedisError) -> Self {
159 Self::redis("unknown", e)
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn test_is_retryable_redis() {
169 let err = ReplicationError::redis_msg("XREAD", "connection reset");
170 assert!(err.is_retryable());
171 assert!(err.to_string().contains("XREAD"));
172 }
173
174 #[test]
175 fn test_is_retryable_peer_connection() {
176 let err = ReplicationError::PeerConnection {
177 peer_id: "peer-1".to_string(),
178 message: "connection refused".to_string(),
179 };
180 assert!(err.is_retryable());
181 assert!(err.to_string().contains("peer-1"));
182 }
183
184 #[test]
185 fn test_is_retryable_sync_engine() {
186 let err = ReplicationError::SyncEngine("overloaded".to_string());
187 assert!(err.is_retryable());
188 }
189
190 #[test]
191 fn test_not_retryable_config() {
192 let err = ReplicationError::Config("invalid peer URL".to_string());
193 assert!(!err.is_retryable());
194 }
195
196 #[test]
197 fn test_not_retryable_decompression() {
198 let err = ReplicationError::Decompression("invalid zstd header".to_string());
199 assert!(!err.is_retryable());
200 }
201
202 #[test]
203 fn test_not_retryable_stream_parse() {
204 let err = ReplicationError::StreamParse("missing op field".to_string());
205 assert!(!err.is_retryable());
206 }
207
208 #[test]
209 fn test_not_retryable_invalid_state() {
210 let err = ReplicationError::InvalidState {
211 expected: "Running".to_string(),
212 actual: "Stopped".to_string(),
213 };
214 assert!(!err.is_retryable());
215 assert!(err.to_string().contains("Running"));
216 assert!(err.to_string().contains("Stopped"));
217 }
218
219 #[test]
220 fn test_not_retryable_shutdown() {
221 let err = ReplicationError::Shutdown;
222 assert!(!err.is_retryable());
223 }
224
225 #[test]
226 fn test_not_retryable_internal() {
227 let err = ReplicationError::Internal("unexpected panic".to_string());
228 assert!(!err.is_retryable());
229 }
230
231 #[test]
232 fn test_redis_error_formatting() {
233 let err = ReplicationError::Redis {
234 operation: "XRANGE".to_string(),
235 message: "timeout".to_string(),
236 source: None,
237 };
238 let msg = err.to_string();
239 assert!(msg.contains("Redis error"));
240 assert!(msg.contains("XRANGE"));
241 assert!(msg.contains("timeout"));
242 }
243}