Skip to main content

common/coordinator/
error.rs

1/// Errors that can occur during write coordination.
2///
3/// The `W` parameter carries the original write value back to the caller on
4/// retryable errors ([`Backpressure`](WriteError::Backpressure) and
5/// [`TimeoutError`](WriteError::TimeoutError)), allowing retry without
6/// cloning. Methods that cannot return the write use the default `W = ()`.
7pub enum WriteError<W = ()> {
8    /// The write queue is full and backpressure is being applied.
9    /// Contains the write that could not be enqueued.
10    Backpressure(W),
11    /// The write queue timed out while awaiting space.
12    /// Contains the write that could not be enqueued.
13    TimeoutError(W),
14    /// The coordinator has been dropped/shutdown
15    Shutdown,
16    /// Error applying the write to the delta
17    ApplyError(u64, String),
18    /// Error flushing the delta to storage
19    FlushError(String),
20    /// Internal error
21    Internal(String),
22}
23
24impl<W> WriteError<W> {
25    /// Extracts the write value from retryable error variants.
26    ///
27    /// Returns `Some(W)` for `Backpressure` and `TimeoutError`, `None` for
28    /// all other variants.
29    pub fn into_inner(self) -> Option<W> {
30        match self {
31            WriteError::Backpressure(w) | WriteError::TimeoutError(w) => Some(w),
32            _ => None,
33        }
34    }
35
36    /// Discards the write value, converting to a `WriteError<()>`.
37    pub fn discard_inner(self) -> WriteError {
38        match self {
39            WriteError::Backpressure(_) => WriteError::Backpressure(()),
40            WriteError::TimeoutError(_) => WriteError::TimeoutError(()),
41            WriteError::Shutdown => WriteError::Shutdown,
42            WriteError::ApplyError(epoch, msg) => WriteError::ApplyError(epoch, msg),
43            WriteError::FlushError(msg) => WriteError::FlushError(msg),
44            WriteError::Internal(msg) => WriteError::Internal(msg),
45        }
46    }
47}
48
49impl<W> std::fmt::Debug for WriteError<W> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            WriteError::Backpressure(_) => write!(f, "Backpressure(..)"),
53            WriteError::TimeoutError(_) => write!(f, "TimeoutError(..)"),
54            WriteError::Shutdown => write!(f, "Shutdown"),
55            WriteError::ApplyError(epoch, msg) => {
56                f.debug_tuple("ApplyError").field(epoch).field(msg).finish()
57            }
58            WriteError::FlushError(msg) => f.debug_tuple("FlushError").field(msg).finish(),
59            WriteError::Internal(msg) => f.debug_tuple("Internal").field(msg).finish(),
60        }
61    }
62}
63
64impl<W> std::fmt::Display for WriteError<W> {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            WriteError::Backpressure(_) => write!(f, "write queue is full, backpressure applied"),
68            WriteError::TimeoutError(_) => {
69                write!(f, "timed out waiting for space in write queue")
70            }
71            WriteError::Shutdown => write!(f, "coordinator has been dropped/shutdown"),
72            WriteError::ApplyError(epoch, msg) => {
73                write!(f, "error applying write @{}: {}", epoch, msg)
74            }
75            WriteError::FlushError(msg) => write!(f, "error flushing delta: {}", msg),
76            WriteError::Internal(msg) => write!(f, "internal error: {}", msg),
77        }
78    }
79}
80
81impl<W> std::error::Error for WriteError<W> {}
82
83/// Result type for write operations.
84pub type WriteResult<T> = Result<T, WriteError>;