osproxy_sink/error.rs
1//! Failures returned by a [`Sink`](crate::Sink).
2
3use osproxy_core::{Epoch, ErrorCode};
4use thiserror::Error;
5
6/// A failure applying a write at the sink.
7#[non_exhaustive]
8#[derive(Debug, Error)]
9pub enum SinkError {
10 /// The upstream cluster returned an error status for the whole request
11 /// (not a per-item failure, which is carried in the ack).
12 #[error("upstream returned {status} (retryable={retryable})")]
13 Upstream {
14 /// The upstream HTTP status.
15 status: u16,
16 /// Whether the caller may retry.
17 retryable: bool,
18 },
19
20 /// The write could not be delivered (connection reset, timeout, TLS). The
21 /// message is a shape/category description, never tenant data.
22 #[error("transport failure: {kind}")]
23 Transport {
24 /// A short, value-free description of the transport failure.
25 kind: &'static str,
26 },
27
28 /// The write was resolved against an epoch that is stale for a migrating
29 /// partition; the caller must re-resolve and retry (`docs/06` ยง2). Wired in
30 /// M5; defined here so the sink contract is stable.
31 #[error("stale epoch {stamped} (current {current})")]
32 StaleEpoch {
33 /// The epoch the rejected write carried.
34 stamped: Epoch,
35 /// The current epoch the sink expected.
36 current: Epoch,
37 },
38}
39
40impl SinkError {
41 /// The stable [`ErrorCode`] for this failure.
42 #[must_use]
43 pub fn code(&self) -> ErrorCode {
44 match self {
45 Self::Upstream { .. } | Self::Transport { .. } => ErrorCode::UpstreamFailed,
46 Self::StaleEpoch { .. } => ErrorCode::StaleEpoch,
47 }
48 }
49
50 /// Whether the caller may retry (possibly after re-resolving placement).
51 #[must_use]
52 pub fn retryable(&self) -> bool {
53 match self {
54 Self::Upstream { retryable, .. } => *retryable,
55 // A transport failure is transient; a stale epoch is retryable after
56 // the client re-resolves the placement.
57 Self::Transport { .. } | Self::StaleEpoch { .. } => true,
58 }
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 #[test]
67 fn codes_and_retryability() {
68 assert_eq!(
69 SinkError::Upstream {
70 status: 503,
71 retryable: true
72 }
73 .code(),
74 ErrorCode::UpstreamFailed
75 );
76 assert!(SinkError::Transport { kind: "reset" }.retryable());
77 assert!(SinkError::StaleEpoch {
78 stamped: Epoch::new(1),
79 current: Epoch::new(2)
80 }
81 .retryable());
82 assert!(!SinkError::Upstream {
83 status: 400,
84 retryable: false
85 }
86 .retryable());
87 }
88}