Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    record::{FencingToken, SeqNum, StreamPosition},
5    types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12    #[error("deserialization: {0}")]
13    Deserialization(#[from] kv::DeserializationError),
14    #[error("database: {0}")]
15    Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19    fn from(error: slatedb::Error) -> Self {
20        StorageError::Database(Arc::new(error))
21    }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27    pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33    pub basin: BasinName,
34    pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40    pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46    pub basin: BasinName,
47    pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53    pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59    pub basin: BasinName,
60    pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85    #[error(transparent)]
86    Storage(#[from] StorageError),
87    #[error(transparent)]
88    StreamNotFound(#[from] StreamNotFoundError),
89    #[error(transparent)]
90    StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95    #[error(transparent)]
96    Storage(#[from] StorageError),
97    #[error(transparent)]
98    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
99    #[error(transparent)]
100    RequestDroppedError(#[from] RequestDroppedError),
101    #[error(transparent)]
102    ConditionFailed(#[from] AppendConditionFailedError),
103    #[error(transparent)]
104    TimestampMissing(#[from] AppendTimestampRequiredError),
105}
106
107#[derive(Debug, Clone, thiserror::Error)]
108pub enum CheckTailError {
109    #[error(transparent)]
110    Storage(#[from] StorageError),
111    #[error(transparent)]
112    TransactionConflict(#[from] TransactionConflictError),
113    #[error(transparent)]
114    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
115    #[error(transparent)]
116    BasinNotFound(#[from] BasinNotFoundError),
117    #[error(transparent)]
118    StreamNotFound(#[from] StreamNotFoundError),
119    #[error(transparent)]
120    BasinDeletionPending(#[from] BasinDeletionPendingError),
121    #[error(transparent)]
122    StreamDeletionPending(#[from] StreamDeletionPendingError),
123}
124
125impl From<StreamerError> for CheckTailError {
126    fn from(e: StreamerError) -> Self {
127        match e {
128            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
129            StreamerError::Storage(e) => Self::Storage(e),
130            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
131        }
132    }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum AppendError {
137    #[error(transparent)]
138    Storage(#[from] StorageError),
139    #[error(transparent)]
140    TransactionConflict(#[from] TransactionConflictError),
141    #[error(transparent)]
142    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
143    #[error(transparent)]
144    RequestDroppedError(#[from] RequestDroppedError),
145    #[error(transparent)]
146    BasinNotFound(#[from] BasinNotFoundError),
147    #[error(transparent)]
148    StreamNotFound(#[from] StreamNotFoundError),
149    #[error(transparent)]
150    BasinDeletionPending(#[from] BasinDeletionPendingError),
151    #[error(transparent)]
152    StreamDeletionPending(#[from] StreamDeletionPendingError),
153    #[error(transparent)]
154    ConditionFailed(#[from] AppendConditionFailedError),
155    #[error(transparent)]
156    TimestampMissing(#[from] AppendTimestampRequiredError),
157}
158
159impl From<AppendErrorInternal> for AppendError {
160    fn from(e: AppendErrorInternal) -> Self {
161        match e {
162            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
163            AppendErrorInternal::StreamerMissingInActionError(e) => {
164                AppendError::StreamerMissingInActionError(e)
165            }
166            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
167            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
168            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
169        }
170    }
171}
172
173#[derive(Debug, Clone, thiserror::Error)]
174pub enum AppendConditionFailedError {
175    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
176    FencingTokenMismatch {
177        expected: FencingToken,
178        actual: FencingToken,
179        applied_point: RangeTo<SeqNum>,
180    },
181    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
182    SeqNumMismatch {
183        assigned_seq_num: SeqNum,
184        match_seq_num: SeqNum,
185    },
186}
187
188impl AppendConditionFailedError {
189    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
190        use AppendConditionFailedError::*;
191        match self {
192            SeqNumMismatch {
193                assigned_seq_num, ..
194            } => ..*assigned_seq_num,
195            FencingTokenMismatch { applied_point, .. } => *applied_point,
196        }
197    }
198}
199
200impl From<StreamerError> for AppendError {
201    fn from(e: StreamerError) -> Self {
202        match e {
203            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
204            StreamerError::Storage(e) => Self::Storage(e),
205            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
206        }
207    }
208}
209
210#[derive(Debug, Clone, thiserror::Error)]
211pub enum ReadError {
212    #[error(transparent)]
213    Storage(#[from] StorageError),
214    #[error(transparent)]
215    TransactionConflict(#[from] TransactionConflictError),
216    #[error(transparent)]
217    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
218    #[error(transparent)]
219    BasinNotFound(#[from] BasinNotFoundError),
220    #[error(transparent)]
221    StreamNotFound(#[from] StreamNotFoundError),
222    #[error(transparent)]
223    BasinDeletionPending(#[from] BasinDeletionPendingError),
224    #[error(transparent)]
225    StreamDeletionPending(#[from] StreamDeletionPendingError),
226    #[error(transparent)]
227    Unwritten(#[from] UnwrittenError),
228}
229
230impl From<StreamerError> for ReadError {
231    fn from(e: StreamerError) -> Self {
232        match e {
233            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
234            StreamerError::Storage(e) => Self::Storage(e),
235            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
236        }
237    }
238}
239
240impl From<kv::DeserializationError> for ReadError {
241    fn from(e: kv::DeserializationError) -> Self {
242        Self::Storage(e.into())
243    }
244}
245
246impl From<slatedb::Error> for ReadError {
247    fn from(e: slatedb::Error) -> Self {
248        Self::Storage(e.into())
249    }
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum ListStreamsError {
254    #[error(transparent)]
255    Storage(#[from] StorageError),
256}
257
258impl From<slatedb::Error> for ListStreamsError {
259    fn from(e: slatedb::Error) -> Self {
260        Self::Storage(e.into())
261    }
262}
263
264impl From<kv::DeserializationError> for ListStreamsError {
265    fn from(e: kv::DeserializationError) -> Self {
266        Self::Storage(e.into())
267    }
268}
269
270#[derive(Debug, Clone, thiserror::Error)]
271pub enum CreateStreamError {
272    #[error(transparent)]
273    Storage(#[from] StorageError),
274    #[error(transparent)]
275    TransactionConflict(#[from] TransactionConflictError),
276    #[error(transparent)]
277    BasinNotFound(#[from] BasinNotFoundError),
278    #[error(transparent)]
279    BasinDeletionPending(#[from] BasinDeletionPendingError),
280    #[error(transparent)]
281    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
282    #[error(transparent)]
283    StreamDeletionPending(#[from] StreamDeletionPendingError),
284}
285
286impl From<slatedb::Error> for CreateStreamError {
287    fn from(err: slatedb::Error) -> Self {
288        if err.kind() == slatedb::ErrorKind::Transaction {
289            Self::TransactionConflict(TransactionConflictError)
290        } else {
291            Self::Storage(err.into())
292        }
293    }
294}
295
296impl From<GetBasinConfigError> for CreateStreamError {
297    fn from(err: GetBasinConfigError) -> Self {
298        match err {
299            GetBasinConfigError::Storage(e) => Self::Storage(e),
300            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
301        }
302    }
303}
304
305#[derive(Debug, Clone, thiserror::Error)]
306pub enum GetStreamConfigError {
307    #[error(transparent)]
308    Storage(#[from] StorageError),
309    #[error(transparent)]
310    StreamNotFound(#[from] StreamNotFoundError),
311    #[error(transparent)]
312    StreamDeletionPending(#[from] StreamDeletionPendingError),
313}
314
315#[derive(Debug, Clone, thiserror::Error)]
316pub enum DeleteStreamError {
317    #[error(transparent)]
318    Storage(#[from] StorageError),
319    #[error(transparent)]
320    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
321    #[error(transparent)]
322    RequestDroppedError(#[from] RequestDroppedError),
323    #[error(transparent)]
324    StreamNotFound(#[from] StreamNotFoundError),
325}
326
327impl From<slatedb::Error> for DeleteStreamError {
328    fn from(err: slatedb::Error) -> Self {
329        Self::Storage(err.into())
330    }
331}
332
333#[derive(Debug, Clone, thiserror::Error)]
334pub enum BasinDeletionError {
335    #[error(transparent)]
336    Storage(#[from] StorageError),
337    #[error(transparent)]
338    DeleteStream(#[from] DeleteStreamError),
339}
340
341#[derive(Debug, Clone, thiserror::Error)]
342pub enum StreamDeleteOnEmptyError {
343    #[error(transparent)]
344    Storage(#[from] StorageError),
345    #[error(transparent)]
346    DeleteStream(#[from] DeleteStreamError),
347}
348
349#[derive(Debug, Clone, thiserror::Error)]
350pub enum ListBasinsError {
351    #[error(transparent)]
352    Storage(#[from] StorageError),
353}
354
355impl From<slatedb::Error> for ListBasinsError {
356    fn from(err: slatedb::Error) -> Self {
357        Self::Storage(err.into())
358    }
359}
360
361impl From<kv::DeserializationError> for ListBasinsError {
362    fn from(e: kv::DeserializationError) -> Self {
363        Self::Storage(e.into())
364    }
365}
366
367#[derive(Debug, Clone, thiserror::Error)]
368pub enum CreateBasinError {
369    #[error(transparent)]
370    Storage(#[from] StorageError),
371    #[error(transparent)]
372    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
373    #[error(transparent)]
374    BasinDeletionPending(#[from] BasinDeletionPendingError),
375}
376
377impl From<slatedb::Error> for CreateBasinError {
378    fn from(err: slatedb::Error) -> Self {
379        Self::Storage(err.into())
380    }
381}
382
383#[derive(Debug, Clone, thiserror::Error)]
384pub enum GetBasinConfigError {
385    #[error(transparent)]
386    Storage(#[from] StorageError),
387    #[error(transparent)]
388    BasinNotFound(#[from] BasinNotFoundError),
389}
390
391#[derive(Debug, Clone, thiserror::Error)]
392pub enum ReconfigureBasinError {
393    #[error(transparent)]
394    Storage(#[from] StorageError),
395    #[error(transparent)]
396    TransactionConflict(#[from] TransactionConflictError),
397    #[error(transparent)]
398    BasinNotFound(#[from] BasinNotFoundError),
399    #[error(transparent)]
400    BasinDeletionPending(#[from] BasinDeletionPendingError),
401}
402
403impl From<slatedb::Error> for ReconfigureBasinError {
404    fn from(err: slatedb::Error) -> Self {
405        if err.kind() == slatedb::ErrorKind::Transaction {
406            Self::TransactionConflict(TransactionConflictError)
407        } else {
408            Self::Storage(err.into())
409        }
410    }
411}
412
413#[derive(Debug, Clone, thiserror::Error)]
414pub enum ReconfigureStreamError {
415    #[error(transparent)]
416    Storage(#[from] StorageError),
417    #[error(transparent)]
418    TransactionConflict(#[from] TransactionConflictError),
419    #[error(transparent)]
420    StreamNotFound(#[from] StreamNotFoundError),
421    #[error(transparent)]
422    StreamDeletionPending(#[from] StreamDeletionPendingError),
423}
424
425impl From<slatedb::Error> for ReconfigureStreamError {
426    fn from(err: slatedb::Error) -> Self {
427        if err.kind() == slatedb::ErrorKind::Transaction {
428            Self::TransactionConflict(TransactionConflictError)
429        } else {
430            Self::Storage(err.into())
431        }
432    }
433}
434
435#[derive(Debug, Clone, thiserror::Error)]
436pub enum DeleteBasinError {
437    #[error(transparent)]
438    Storage(#[from] StorageError),
439    #[error(transparent)]
440    BasinNotFound(#[from] BasinNotFoundError),
441}
442
443impl From<slatedb::Error> for DeleteBasinError {
444    fn from(err: slatedb::Error) -> Self {
445        Self::Storage(err.into())
446    }
447}