s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    record::{FencingToken, SeqNum, StreamPosition},
5    types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12    #[error("deserialization: {0}")]
13    Deserialization(#[from] kv::DeserializationError),
14    #[error("database: {0}")]
15    Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19    fn from(error: slatedb::Error) -> Self {
20        StorageError::Database(Arc::new(error))
21    }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27    pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33    pub basin: BasinName,
34    pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40    pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46    pub basin: BasinName,
47    pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53    pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59    pub basin: BasinName,
60    pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68pub enum UnavailableError {
69    #[error("unavailable: missing in action")]
70    MissingInAction,
71    #[error("unavailable: request drop")]
72    RequestDrop,
73}
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85    #[error(transparent)]
86    Storage(#[from] StorageError),
87    #[error(transparent)]
88    StreamNotFound(#[from] StreamNotFoundError),
89    #[error(transparent)]
90    StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95    #[error(transparent)]
96    Storage(#[from] StorageError),
97    #[error(transparent)]
98    Unavailable(#[from] UnavailableError),
99    #[error(transparent)]
100    ConditionFailed(#[from] AppendConditionFailedError),
101    #[error(transparent)]
102    TimestampMissing(#[from] AppendTimestampRequiredError),
103}
104
105#[derive(Debug, Clone, thiserror::Error)]
106pub enum CheckTailError {
107    #[error(transparent)]
108    Storage(#[from] StorageError),
109    #[error(transparent)]
110    TransactionConflict(#[from] TransactionConflictError),
111    #[error(transparent)]
112    Unavailable(#[from] UnavailableError),
113    #[error(transparent)]
114    BasinNotFound(#[from] BasinNotFoundError),
115    #[error(transparent)]
116    StreamNotFound(#[from] StreamNotFoundError),
117    #[error(transparent)]
118    BasinDeletionPending(#[from] BasinDeletionPendingError),
119    #[error(transparent)]
120    StreamDeletionPending(#[from] StreamDeletionPendingError),
121}
122
123impl From<StreamerError> for CheckTailError {
124    fn from(e: StreamerError) -> Self {
125        match e {
126            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
127            StreamerError::Storage(e) => Self::Storage(e),
128            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
129        }
130    }
131}
132
133#[derive(Debug, Clone, thiserror::Error)]
134pub enum AppendError {
135    #[error(transparent)]
136    Storage(#[from] StorageError),
137    #[error(transparent)]
138    TransactionConflict(#[from] TransactionConflictError),
139    #[error(transparent)]
140    Unavailable(#[from] UnavailableError),
141    #[error(transparent)]
142    BasinNotFound(#[from] BasinNotFoundError),
143    #[error(transparent)]
144    StreamNotFound(#[from] StreamNotFoundError),
145    #[error(transparent)]
146    BasinDeletionPending(#[from] BasinDeletionPendingError),
147    #[error(transparent)]
148    StreamDeletionPending(#[from] StreamDeletionPendingError),
149    #[error(transparent)]
150    ConditionFailed(#[from] AppendConditionFailedError),
151    #[error(transparent)]
152    TimestampMissing(#[from] AppendTimestampRequiredError),
153}
154
155impl From<AppendErrorInternal> for AppendError {
156    fn from(e: AppendErrorInternal) -> Self {
157        match e {
158            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
159            AppendErrorInternal::Unavailable(e) => AppendError::Unavailable(e),
160            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
161            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
162        }
163    }
164}
165
166#[derive(Debug, Clone, thiserror::Error)]
167pub enum AppendConditionFailedError {
168    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
169    FencingTokenMismatch {
170        expected: FencingToken,
171        actual: FencingToken,
172        applied_point: RangeTo<SeqNum>,
173    },
174    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
175    SeqNumMismatch {
176        assigned_seq_num: SeqNum,
177        match_seq_num: SeqNum,
178    },
179}
180
181impl AppendConditionFailedError {
182    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
183        use AppendConditionFailedError::*;
184        match self {
185            SeqNumMismatch {
186                assigned_seq_num, ..
187            } => ..*assigned_seq_num,
188            FencingTokenMismatch { applied_point, .. } => *applied_point,
189        }
190    }
191}
192
193impl From<StreamerError> for AppendError {
194    fn from(e: StreamerError) -> Self {
195        match e {
196            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
197            StreamerError::Storage(e) => Self::Storage(e),
198            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
199        }
200    }
201}
202
203#[derive(Debug, Clone, thiserror::Error)]
204pub enum ReadError {
205    #[error(transparent)]
206    Storage(#[from] StorageError),
207    #[error(transparent)]
208    TransactionConflict(#[from] TransactionConflictError),
209    #[error(transparent)]
210    Unavailable(#[from] UnavailableError),
211    #[error(transparent)]
212    BasinNotFound(#[from] BasinNotFoundError),
213    #[error(transparent)]
214    StreamNotFound(#[from] StreamNotFoundError),
215    #[error(transparent)]
216    BasinDeletionPending(#[from] BasinDeletionPendingError),
217    #[error(transparent)]
218    StreamDeletionPending(#[from] StreamDeletionPendingError),
219    #[error(transparent)]
220    Unwritten(#[from] UnwrittenError),
221}
222
223impl From<StreamerError> for ReadError {
224    fn from(e: StreamerError) -> Self {
225        match e {
226            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
227            StreamerError::Storage(e) => Self::Storage(e),
228            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
229        }
230    }
231}
232
233impl From<kv::DeserializationError> for ReadError {
234    fn from(e: kv::DeserializationError) -> Self {
235        Self::Storage(e.into())
236    }
237}
238
239impl From<slatedb::Error> for ReadError {
240    fn from(e: slatedb::Error) -> Self {
241        Self::Storage(e.into())
242    }
243}
244
245#[derive(Debug, Clone, thiserror::Error)]
246pub enum ListStreamsError {
247    #[error(transparent)]
248    Storage(#[from] StorageError),
249}
250
251impl From<slatedb::Error> for ListStreamsError {
252    fn from(e: slatedb::Error) -> Self {
253        Self::Storage(e.into())
254    }
255}
256
257impl From<kv::DeserializationError> for ListStreamsError {
258    fn from(e: kv::DeserializationError) -> Self {
259        Self::Storage(e.into())
260    }
261}
262
263#[derive(Debug, Clone, thiserror::Error)]
264pub enum CreateStreamError {
265    #[error(transparent)]
266    Storage(#[from] StorageError),
267    #[error(transparent)]
268    TransactionConflict(#[from] TransactionConflictError),
269    #[error(transparent)]
270    Unavailable(#[from] UnavailableError),
271    #[error(transparent)]
272    BasinNotFound(#[from] BasinNotFoundError),
273    #[error(transparent)]
274    BasinDeletionPending(#[from] BasinDeletionPendingError),
275    #[error(transparent)]
276    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
277    #[error(transparent)]
278    StreamDeletionPending(#[from] StreamDeletionPendingError),
279}
280
281impl From<slatedb::Error> for CreateStreamError {
282    fn from(err: slatedb::Error) -> Self {
283        if err.kind() == slatedb::ErrorKind::Transaction {
284            Self::TransactionConflict(TransactionConflictError)
285        } else {
286            Self::Storage(err.into())
287        }
288    }
289}
290
291impl From<GetBasinConfigError> for CreateStreamError {
292    fn from(err: GetBasinConfigError) -> Self {
293        match err {
294            GetBasinConfigError::Storage(e) => Self::Storage(e),
295            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
296        }
297    }
298}
299
300#[derive(Debug, Clone, thiserror::Error)]
301pub enum GetStreamConfigError {
302    #[error(transparent)]
303    Storage(#[from] StorageError),
304    #[error(transparent)]
305    StreamNotFound(#[from] StreamNotFoundError),
306}
307
308#[derive(Debug, Clone, thiserror::Error)]
309pub enum DeleteStreamError {
310    #[error(transparent)]
311    Storage(#[from] StorageError),
312    #[error(transparent)]
313    Unavailable(#[from] UnavailableError),
314    #[error(transparent)]
315    StreamNotFound(#[from] StreamNotFoundError),
316}
317
318impl From<slatedb::Error> for DeleteStreamError {
319    fn from(err: slatedb::Error) -> Self {
320        Self::Storage(err.into())
321    }
322}
323
324#[derive(Debug, Clone, thiserror::Error)]
325pub enum ListBasinsError {
326    #[error(transparent)]
327    Storage(#[from] StorageError),
328}
329
330impl From<slatedb::Error> for ListBasinsError {
331    fn from(err: slatedb::Error) -> Self {
332        Self::Storage(err.into())
333    }
334}
335
336impl From<kv::DeserializationError> for ListBasinsError {
337    fn from(e: kv::DeserializationError) -> Self {
338        Self::Storage(e.into())
339    }
340}
341
342#[derive(Debug, Clone, thiserror::Error)]
343pub enum CreateBasinError {
344    #[error(transparent)]
345    Storage(#[from] StorageError),
346    #[error(transparent)]
347    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
348    #[error(transparent)]
349    BasinDeletionPending(#[from] BasinDeletionPendingError),
350}
351
352impl From<slatedb::Error> for CreateBasinError {
353    fn from(err: slatedb::Error) -> Self {
354        Self::Storage(err.into())
355    }
356}
357
358#[derive(Debug, Clone, thiserror::Error)]
359pub enum GetBasinConfigError {
360    #[error(transparent)]
361    Storage(#[from] StorageError),
362    #[error(transparent)]
363    BasinNotFound(#[from] BasinNotFoundError),
364}
365
366#[derive(Debug, Clone, thiserror::Error)]
367pub enum ReconfigureBasinError {
368    #[error(transparent)]
369    Storage(#[from] StorageError),
370    #[error(transparent)]
371    TransactionConflict(#[from] TransactionConflictError),
372    #[error(transparent)]
373    BasinNotFound(#[from] BasinNotFoundError),
374    #[error(transparent)]
375    BasinDeletionPending(#[from] BasinDeletionPendingError),
376}
377
378impl From<slatedb::Error> for ReconfigureBasinError {
379    fn from(err: slatedb::Error) -> Self {
380        if err.kind() == slatedb::ErrorKind::Transaction {
381            Self::TransactionConflict(TransactionConflictError)
382        } else {
383            Self::Storage(err.into())
384        }
385    }
386}
387
388#[derive(Debug, Clone, thiserror::Error)]
389pub enum ReconfigureStreamError {
390    #[error(transparent)]
391    Storage(#[from] StorageError),
392    #[error(transparent)]
393    TransactionConflict(#[from] TransactionConflictError),
394    #[error(transparent)]
395    Unavailable(#[from] UnavailableError),
396    #[error(transparent)]
397    StreamNotFound(#[from] StreamNotFoundError),
398    #[error(transparent)]
399    StreamDeletionPending(#[from] StreamDeletionPendingError),
400}
401
402impl From<slatedb::Error> for ReconfigureStreamError {
403    fn from(err: slatedb::Error) -> Self {
404        if err.kind() == slatedb::ErrorKind::Transaction {
405            Self::TransactionConflict(TransactionConflictError)
406        } else {
407            Self::Storage(err.into())
408        }
409    }
410}
411
412#[derive(Debug, Clone, thiserror::Error)]
413pub enum DeleteBasinError {
414    #[error(transparent)]
415    Storage(#[from] StorageError),
416    #[error(transparent)]
417    BasinNotFound(#[from] BasinNotFoundError),
418}
419
420impl From<slatedb::Error> for DeleteBasinError {
421    fn from(err: slatedb::Error) -> Self {
422        Self::Storage(err.into())
423    }
424}