Skip to main content

osproxy_sink/
error.rs

1//! Failures returned by a [`Sink`](crate::Sink).
2
3use osproxy_core::{Epoch, ErrorCode};
4use thiserror::Error;
5
6/// A failure applying a write at the sink.
7#[non_exhaustive]
8#[derive(Debug, Error)]
9pub enum SinkError {
10    /// The upstream cluster returned an error status for the whole request
11    /// (not a per-item failure, which is carried in the ack).
12    #[error("upstream returned {status} (retryable={retryable})")]
13    Upstream {
14        /// The upstream HTTP status.
15        status: u16,
16        /// Whether the caller may retry.
17        retryable: bool,
18    },
19
20    /// The write could not be delivered (connection reset, timeout, TLS). The
21    /// message is a shape/category description, never tenant data.
22    #[error("transport failure: {kind}")]
23    Transport {
24        /// A short, value-free description of the transport failure.
25        kind: &'static str,
26    },
27
28    /// The write was resolved against an epoch that is stale for a migrating
29    /// partition; the caller must re-resolve and retry (`docs/06` ยง2). Wired in
30    /// M5; defined here so the sink contract is stable.
31    #[error("stale epoch {stamped} (current {current})")]
32    StaleEpoch {
33        /// The epoch the rejected write carried.
34        stamped: Epoch,
35        /// The current epoch the sink expected.
36        current: Epoch,
37    },
38}
39
40impl SinkError {
41    /// The stable [`ErrorCode`] for this failure.
42    #[must_use]
43    pub fn code(&self) -> ErrorCode {
44        match self {
45            Self::Upstream { .. } | Self::Transport { .. } => ErrorCode::UpstreamFailed,
46            Self::StaleEpoch { .. } => ErrorCode::StaleEpoch,
47        }
48    }
49
50    /// Whether the caller may retry (possibly after re-resolving placement).
51    #[must_use]
52    pub fn retryable(&self) -> bool {
53        match self {
54            Self::Upstream { retryable, .. } => *retryable,
55            // A transport failure is transient; a stale epoch is retryable after
56            // the client re-resolves the placement.
57            Self::Transport { .. } | Self::StaleEpoch { .. } => true,
58        }
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    #[test]
67    fn codes_and_retryability() {
68        assert_eq!(
69            SinkError::Upstream {
70                status: 503,
71                retryable: true
72            }
73            .code(),
74            ErrorCode::UpstreamFailed
75        );
76        assert!(SinkError::Transport { kind: "reset" }.retryable());
77        assert!(SinkError::StaleEpoch {
78            stamped: Epoch::new(1),
79            current: Epoch::new(2)
80        }
81        .retryable());
82        assert!(!SinkError::Upstream {
83            status: 400,
84            retryable: false
85        }
86        .retryable());
87    }
88}