1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//! Backend-agnostic error types for infrastructure operations.
use thiserror::Error;
use time::OffsetDateTime;
/// Result type alias for infrastructure operations.
pub type Result<T> = std::result::Result<T, Error>;
/// Errors that can occur during infrastructure operations.
///
/// Backend-specific errors (sqlx, redis, etc.) are wrapped in [`Error::Backend`].
#[derive(Debug, Error)]
pub enum Error {
/// Backend-specific error (sqlx::Error, redis::Error, etc.).
#[error("backend error: {0}")]
Backend(Box<dyn std::error::Error + Send + Sync>),
/// JSON serialization/deserialization error.
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
/// Key not found in cache or counter.
#[error("key not found: {0}")]
NotFound(String),
/// Job not found in queue.
///
/// After the `PARTITION BY LIST (queue_name)` migration, every id-based
/// JobQueue operation scopes the lookup to a specific partition via
/// `queue_name`. A `JobNotFound` can therefore mean either "no job with
/// this id in this queue" (the genuine case) or "the caller passed the
/// wrong queue_name for a real id" (a programming bug). Carrying both
/// fields lets operators distinguish the two in logs.
#[error("job not found: queue={queue_name}, id={id}")]
JobNotFound {
/// The queue the caller asked for.
queue_name: String,
/// The job id the caller asked for.
id: String,
},
/// Rate limit exceeded.
#[error("rate limit exceeded: {current} requests (limit: {limit})")]
RateLimitExceeded {
/// Current weighted request count.
current: f64,
/// Maximum allowed requests.
limit: i32,
/// When the rate limit resets.
reset_at: OffsetDateTime,
},
/// Lock acquisition failed (already held by another session).
#[error("failed to acquire lock: {0}")]
LockFailed(String),
/// Lock acquisition timed out.
#[error("lock acquisition timed out after {0}ms")]
LockTimeout(i32),
/// Payload too large for the backend's notification limit.
#[error("payload too large: {size} bytes (max: {max_size})")]
PayloadTooLarge {
/// Actual payload size in bytes.
size: usize,
/// Maximum allowed size in bytes.
max_size: usize,
},
/// Internal channel closed (background task exited).
#[error("channel closed: {0}")]
ChannelClosed(String),
}
impl Error {
/// Wrap a backend-specific error.
pub fn backend(err: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::Backend(Box::new(err))
}
/// Returns true if this is a rate limit exceeded error.
pub fn is_rate_limited(&self) -> bool {
matches!(self, Error::RateLimitExceeded { .. })
}
/// Returns true if this is a not found error.
pub fn is_not_found(&self) -> bool {
matches!(self, Error::NotFound(_) | Error::JobNotFound { .. })
}
/// Returns true if this is a lock-related error.
pub fn is_lock_error(&self) -> bool {
matches!(self, Error::LockFailed(_) | Error::LockTimeout(_))
}
/// Extract rate limit reset time if this is a rate limit error.
pub fn reset_at(&self) -> Option<OffsetDateTime> {
match self {
Error::RateLimitExceeded { reset_at, .. } => Some(*reset_at),
_ => None,
}
}
}