1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
use std::fmt::Formatter;
use std::ops::Bound;

use anyerror::AnyError;

use crate::storage::SnapshotSignature;
use crate::LogId;
use crate::NodeId;
use crate::Vote;

/// Convert error to StorageError::IO();
pub trait ToStorageResult<NID, T>
where NID: NodeId
{
    /// Convert Result<T, E> to Result<T, StorageError::IO(StorageIOError)>
    ///
    /// `f` provides error context for building the StorageIOError.
    fn sto_res<F>(self, f: F) -> Result<T, StorageError<NID>>
    where F: FnOnce() -> (ErrorSubject<NID>, ErrorVerb);
}

impl<NID, T> ToStorageResult<NID, T> for Result<T, std::io::Error>
where NID: NodeId
{
    fn sto_res<F>(self, f: F) -> Result<T, StorageError<NID>>
    where F: FnOnce() -> (ErrorSubject<NID>, ErrorVerb) {
        match self {
            Ok(x) => Ok(x),
            Err(e) => {
                let (subject, verb) = f();
                let io_err = StorageIOError::new(subject, verb, AnyError::new(&e));
                Err(io_err.into())
            }
        }
    }
}

/// An error that occurs when the RaftStore impl runs defensive check of input or output.
/// E.g. re-applying an log entry is a violation that may be a potential bug.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct DefensiveError<NID>
where NID: NodeId
{
    /// The subject that violates store defensive check, e.g. hard-state, log or state machine.
    pub subject: ErrorSubject<NID>,

    /// The description of the violation.
    pub violation: Violation<NID>,

    pub backtrace: Option<String>,
}

impl<NID> DefensiveError<NID>
where NID: NodeId
{
    pub fn new(subject: ErrorSubject<NID>, violation: Violation<NID>) -> Self {
        Self {
            subject,
            violation,
            backtrace: anyerror::backtrace_str(),
        }
    }
}

impl<NID> std::fmt::Display for DefensiveError<NID>
where NID: NodeId
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "'{:?}' violates: '{}'", self.subject, self.violation)
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum ErrorSubject<NID>
where NID: NodeId
{
    /// A general storage error
    Store,

    /// HardState related error.
    Vote,

    /// Error that is happened when operating a series of log entries
    Logs,

    /// Error about a single log entry
    Log(LogId<NID>),

    /// Error about a single log entry without knowing the log term.
    LogIndex(u64),

    /// Error happened when applying a log entry
    Apply(LogId<NID>),

    /// Error happened when operating state machine.
    StateMachine,

    /// Error happened when operating snapshot.
    Snapshot(SnapshotSignature<NID>),

    None,
}

/// What it is doing when an error occurs.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum ErrorVerb {
    Read,
    Write,
    Seek,
    Delete,
}

/// Violations a store would return when running defensive check.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum Violation<NID: NodeId> {
    #[error("term can only be change to a greater value, current: {curr}, change to {to}")]
    VoteNotAscending { curr: Vote<NID>, to: Vote<NID> },

    #[error("voted_for can not change from Some() to other Some(), current: {curr:?}, change to {to:?}")]
    NonIncrementalVote { curr: Vote<NID>, to: Vote<NID> },

    #[error("log at higher index is obsolete: {higher_index_log_id:?} should GT {lower_index_log_id:?}")]
    DirtyLog {
        higher_index_log_id: LogId<NID>,
        lower_index_log_id: LogId<NID>,
    },

    #[error("try to get log at index {want} but got {got:?}")]
    LogIndexNotFound { want: u64, got: Option<u64> },

    #[error("range is empty: start: {start:?}, end: {end:?}")]
    RangeEmpty { start: Option<u64>, end: Option<u64> },

    #[error("range is not half-open: start: {start:?}, end: {end:?}")]
    RangeNotHalfOpen { start: Bound<u64>, end: Bound<u64> },

    // TODO(xp): rename this to some input related error name.
    #[error("empty log vector")]
    LogsEmpty,

    #[error("all logs are removed. It requires at least one log to track continuity")]
    StoreLogsEmpty,

    #[error("logs are not consecutive, prev: {prev:?}, next: {next}")]
    LogsNonConsecutive { prev: Option<LogId<NID>>, next: LogId<NID> },

    #[error("invalid next log to apply: prev: {prev:?}, next: {next}")]
    ApplyNonConsecutive { prev: Option<LogId<NID>>, next: LogId<NID> },

    #[error("applied log can not conflict, last_applied: {last_applied:?}, delete since: {first_conflict_log_id}")]
    AppliedWontConflict {
        last_applied: Option<LogId<NID>>,
        first_conflict_log_id: LogId<NID>,
    },

    #[error("not allowed to purge non-applied logs, last_applied: {last_applied:?}, purge upto: {purge_upto}")]
    PurgeNonApplied {
        last_applied: Option<LogId<NID>>,
        purge_upto: LogId<NID>,
    },
}

/// A storage error could be either a defensive check error or an error occurred when doing the
/// actual io operation.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum StorageError<NID>
where NID: NodeId
{
    /// An error raised by defensive check.
    #[error(transparent)]
    Defensive {
        #[from]
        #[cfg_attr(feature = "bt", backtrace)]
        source: DefensiveError<NID>,
    },

    /// An error raised by io operation.
    #[error(transparent)]
    IO {
        #[from]
        #[cfg_attr(feature = "bt", backtrace)]
        source: StorageIOError<NID>,
    },
}

impl<NID> StorageError<NID>
where NID: NodeId
{
    pub fn into_defensive(self) -> Option<DefensiveError<NID>> {
        match self {
            StorageError::Defensive { source } => Some(source),
            _ => None,
        }
    }

    pub fn into_io(self) -> Option<StorageIOError<NID>> {
        match self {
            StorageError::IO { source } => Some(source),
            _ => None,
        }
    }

    pub fn from_io_error(subject: ErrorSubject<NID>, verb: ErrorVerb, io_error: std::io::Error) -> Self {
        let sto_io_err = StorageIOError::new(subject, verb, AnyError::new(&io_error));
        StorageError::IO { source: sto_io_err }
    }
}

/// Error that occurs when operating the store.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct StorageIOError<NID>
where NID: NodeId
{
    subject: ErrorSubject<NID>,
    verb: ErrorVerb,
    source: AnyError,
    backtrace: Option<String>,
}

impl<NID> std::fmt::Display for StorageIOError<NID>
where NID: NodeId
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "when {:?} {:?}: {}", self.verb, self.subject, self.source)
    }
}

impl<NID> StorageIOError<NID>
where NID: NodeId
{
    pub fn new(subject: ErrorSubject<NID>, verb: ErrorVerb, source: AnyError) -> Self {
        Self {
            subject,
            verb,
            source,
            backtrace: anyerror::backtrace_str(),
        }
    }
}