Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    encryption::EncryptionSpecResolutionError,
5    record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6    types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13    #[error("deserialization: {0}")]
14    Deserialization(#[from] kv::DeserializationError),
15    #[error("database: {0}")]
16    Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20    fn from(error: slatedb::Error) -> Self {
21        StorageError::Database(Arc::new(error))
22    }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28    pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34    pub basin: BasinName,
35    pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41    pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47    pub basin: BasinName,
48    pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54    pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream deletion pending")]
59pub struct StreamDeletionPendingError;
60
61#[derive(Debug, Clone, thiserror::Error)]
62#[error("unwritten position: {0}")]
63pub struct UnwrittenError(pub StreamPosition);
64
65#[derive(Debug, Clone, thiserror::Error)]
66#[error("streamer missing in action")]
67pub struct StreamerMissingInActionError;
68
69#[derive(Debug, Clone, thiserror::Error)]
70#[error("request dropped")]
71pub struct RequestDroppedError;
72
73#[derive(Debug, Clone, thiserror::Error)]
74#[error("record timestamp was required but was missing")]
75pub struct AppendTimestampRequiredError;
76
77#[derive(Debug, Clone, thiserror::Error)]
78#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
79pub struct MaxSeqNumError {
80    pub first_seq_num: SeqNum,
81    pub assigned_seq_num: SeqNum,
82    pub max_assignable_seq_num: SeqNum,
83}
84
85#[derive(Debug, Clone, thiserror::Error)]
86#[error("transaction conflict occurred – this is usually retriable")]
87pub struct TransactionConflictError;
88
89#[derive(Debug, Clone, thiserror::Error)]
90pub enum StreamerError {
91    #[error(transparent)]
92    Storage(#[from] StorageError),
93    #[error(transparent)]
94    StreamNotFound(#[from] StreamNotFoundError),
95    #[error(transparent)]
96    StreamDeletionPending(#[from] StreamDeletionPendingError),
97}
98
99#[derive(Debug, Clone, thiserror::Error)]
100pub(super) enum AppendErrorInternal {
101    #[error(transparent)]
102    Storage(#[from] StorageError),
103    #[error(transparent)]
104    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
105    #[error(transparent)]
106    RequestDroppedError(#[from] RequestDroppedError),
107    #[error(transparent)]
108    StreamDeletionPending(#[from] StreamDeletionPendingError),
109    #[error(transparent)]
110    ConditionFailed(#[from] AppendConditionFailedError),
111    #[error(transparent)]
112    TimestampMissing(#[from] AppendTimestampRequiredError),
113    #[error(transparent)]
114    MaxSeqNum(#[from] MaxSeqNumError),
115}
116
117impl AppendErrorInternal {
118    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
119        match self {
120            Self::ConditionFailed(e) => e.durability_dependency(),
121            Self::MaxSeqNum(e) => e.durability_dependency(),
122            _ => ..0,
123        }
124    }
125}
126
127#[derive(Debug, Clone, thiserror::Error)]
128pub enum CheckTailError {
129    #[error(transparent)]
130    Storage(#[from] StorageError),
131    #[error(transparent)]
132    TransactionConflict(#[from] TransactionConflictError),
133    #[error(transparent)]
134    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
135    #[error(transparent)]
136    BasinNotFound(#[from] BasinNotFoundError),
137    #[error(transparent)]
138    StreamNotFound(#[from] StreamNotFoundError),
139    #[error(transparent)]
140    BasinDeletionPending(#[from] BasinDeletionPendingError),
141    #[error(transparent)]
142    StreamDeletionPending(#[from] StreamDeletionPendingError),
143}
144
145impl From<StreamerError> for CheckTailError {
146    fn from(e: StreamerError) -> Self {
147        match e {
148            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
149            StreamerError::Storage(e) => Self::Storage(e),
150            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
151        }
152    }
153}
154
155#[derive(Debug, Clone, thiserror::Error)]
156pub enum AppendError {
157    #[error(transparent)]
158    Storage(#[from] StorageError),
159    #[error(transparent)]
160    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
161    #[error(transparent)]
162    TransactionConflict(#[from] TransactionConflictError),
163    #[error(transparent)]
164    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
165    #[error(transparent)]
166    RequestDroppedError(#[from] RequestDroppedError),
167    #[error(transparent)]
168    BasinNotFound(#[from] BasinNotFoundError),
169    #[error(transparent)]
170    StreamNotFound(#[from] StreamNotFoundError),
171    #[error(transparent)]
172    BasinDeletionPending(#[from] BasinDeletionPendingError),
173    #[error(transparent)]
174    StreamDeletionPending(#[from] StreamDeletionPendingError),
175    #[error(transparent)]
176    ConditionFailed(#[from] AppendConditionFailedError),
177    #[error(transparent)]
178    TimestampMissing(#[from] AppendTimestampRequiredError),
179    #[error(transparent)]
180    MaxSeqNum(#[from] MaxSeqNumError),
181}
182
183impl From<AppendErrorInternal> for AppendError {
184    fn from(e: AppendErrorInternal) -> Self {
185        match e {
186            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
187            AppendErrorInternal::StreamerMissingInActionError(e) => {
188                AppendError::StreamerMissingInActionError(e)
189            }
190            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
191            AppendErrorInternal::StreamDeletionPending(e) => AppendError::StreamDeletionPending(e),
192            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
193            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
194            AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
195        }
196    }
197}
198
199#[derive(Debug, Clone, thiserror::Error)]
200pub enum AppendConditionFailedError {
201    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
202    FencingTokenMismatch {
203        expected: FencingToken,
204        actual: FencingToken,
205        applied_point: RangeTo<SeqNum>,
206    },
207    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
208    SeqNumMismatch {
209        assigned_seq_num: SeqNum,
210        match_seq_num: SeqNum,
211    },
212}
213
214impl AppendConditionFailedError {
215    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
216        use AppendConditionFailedError::*;
217        match self {
218            SeqNumMismatch {
219                assigned_seq_num, ..
220            } => ..*assigned_seq_num,
221            FencingTokenMismatch { applied_point, .. } => *applied_point,
222        }
223    }
224}
225
226impl MaxSeqNumError {
227    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
228        ..self.first_seq_num
229    }
230}
231
232impl From<StreamerError> for AppendError {
233    fn from(e: StreamerError) -> Self {
234        match e {
235            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
236            StreamerError::Storage(e) => Self::Storage(e),
237            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
238        }
239    }
240}
241
242#[derive(Debug, Clone, thiserror::Error)]
243pub enum ReadError {
244    #[error(transparent)]
245    Storage(#[from] StorageError),
246    #[error(transparent)]
247    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
248    #[error(transparent)]
249    RecordDecryption(#[from] RecordDecryptionError),
250    #[error(transparent)]
251    TransactionConflict(#[from] TransactionConflictError),
252    #[error(transparent)]
253    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
254    #[error(transparent)]
255    BasinNotFound(#[from] BasinNotFoundError),
256    #[error(transparent)]
257    StreamNotFound(#[from] StreamNotFoundError),
258    #[error(transparent)]
259    BasinDeletionPending(#[from] BasinDeletionPendingError),
260    #[error(transparent)]
261    StreamDeletionPending(#[from] StreamDeletionPendingError),
262    #[error(transparent)]
263    Unwritten(#[from] UnwrittenError),
264}
265
266impl From<StreamerError> for ReadError {
267    fn from(e: StreamerError) -> Self {
268        match e {
269            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
270            StreamerError::Storage(e) => Self::Storage(e),
271            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
272        }
273    }
274}
275
276impl From<kv::DeserializationError> for ReadError {
277    fn from(e: kv::DeserializationError) -> Self {
278        Self::Storage(e.into())
279    }
280}
281
282impl From<slatedb::Error> for ReadError {
283    fn from(e: slatedb::Error) -> Self {
284        Self::Storage(e.into())
285    }
286}
287
288#[derive(Debug, Clone, thiserror::Error)]
289pub enum ListStreamsError {
290    #[error(transparent)]
291    Storage(#[from] StorageError),
292}
293
294impl From<slatedb::Error> for ListStreamsError {
295    fn from(e: slatedb::Error) -> Self {
296        Self::Storage(e.into())
297    }
298}
299
300impl From<kv::DeserializationError> for ListStreamsError {
301    fn from(e: kv::DeserializationError) -> Self {
302        Self::Storage(e.into())
303    }
304}
305
306#[derive(Debug, Clone, thiserror::Error)]
307pub enum ProvisionStreamError {
308    #[error(transparent)]
309    Storage(#[from] StorageError),
310    #[error(transparent)]
311    TransactionConflict(#[from] TransactionConflictError),
312    #[error(transparent)]
313    BasinNotFound(#[from] BasinNotFoundError),
314    #[error(transparent)]
315    BasinDeletionPending(#[from] BasinDeletionPendingError),
316    #[error(transparent)]
317    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
318    #[error(transparent)]
319    StreamDeletionPending(#[from] StreamDeletionPendingError),
320    #[error(transparent)]
321    Validation(#[from] s2_common::types::ValidationError),
322}
323
324impl From<slatedb::Error> for ProvisionStreamError {
325    fn from(err: slatedb::Error) -> Self {
326        if err.kind() == slatedb::ErrorKind::Transaction {
327            Self::TransactionConflict(TransactionConflictError)
328        } else {
329            Self::Storage(err.into())
330        }
331    }
332}
333
334impl From<GetBasinConfigError> for ProvisionStreamError {
335    fn from(err: GetBasinConfigError) -> Self {
336        match err {
337            GetBasinConfigError::Storage(e) => Self::Storage(e),
338            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
339        }
340    }
341}
342
343#[derive(Debug, Clone, thiserror::Error)]
344pub enum GetStreamConfigError {
345    #[error(transparent)]
346    Storage(#[from] StorageError),
347    #[error(transparent)]
348    StreamNotFound(#[from] StreamNotFoundError),
349    #[error(transparent)]
350    StreamDeletionPending(#[from] StreamDeletionPendingError),
351}
352
353#[derive(Debug, Clone, thiserror::Error)]
354pub enum DeleteStreamError {
355    #[error(transparent)]
356    Storage(#[from] StorageError),
357    #[error(transparent)]
358    TransactionConflict(#[from] TransactionConflictError),
359    #[error(transparent)]
360    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
361    #[error(transparent)]
362    RequestDroppedError(#[from] RequestDroppedError),
363    #[error(transparent)]
364    StreamNotFound(#[from] StreamNotFoundError),
365}
366
367impl From<slatedb::Error> for DeleteStreamError {
368    fn from(err: slatedb::Error) -> Self {
369        if err.kind() == slatedb::ErrorKind::Transaction {
370            Self::TransactionConflict(TransactionConflictError)
371        } else {
372            Self::Storage(err.into())
373        }
374    }
375}
376
377#[derive(Debug, Clone, thiserror::Error)]
378pub enum BasinDeletionError {
379    #[error(transparent)]
380    Storage(#[from] StorageError),
381    #[error(transparent)]
382    DeleteStream(#[from] DeleteStreamError),
383}
384
385#[derive(Debug, Clone, thiserror::Error)]
386pub enum StreamDeleteOnEmptyError {
387    #[error(transparent)]
388    Storage(#[from] StorageError),
389    #[error(transparent)]
390    DeleteStream(#[from] DeleteStreamError),
391}
392
393#[derive(Debug, Clone, thiserror::Error)]
394pub enum ListBasinsError {
395    #[error(transparent)]
396    Storage(#[from] StorageError),
397}
398
399impl From<slatedb::Error> for ListBasinsError {
400    fn from(err: slatedb::Error) -> Self {
401        Self::Storage(err.into())
402    }
403}
404
405impl From<kv::DeserializationError> for ListBasinsError {
406    fn from(e: kv::DeserializationError) -> Self {
407        Self::Storage(e.into())
408    }
409}
410
411#[derive(Debug, Clone, thiserror::Error)]
412pub enum ProvisionBasinError {
413    #[error(transparent)]
414    Storage(#[from] StorageError),
415    #[error(transparent)]
416    TransactionConflict(#[from] TransactionConflictError),
417    #[error(transparent)]
418    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
419    #[error(transparent)]
420    BasinDeletionPending(#[from] BasinDeletionPendingError),
421}
422
423impl From<slatedb::Error> for ProvisionBasinError {
424    fn from(err: slatedb::Error) -> Self {
425        if err.kind() == slatedb::ErrorKind::Transaction {
426            Self::TransactionConflict(TransactionConflictError)
427        } else {
428            Self::Storage(err.into())
429        }
430    }
431}
432
433#[derive(Debug, Clone, thiserror::Error)]
434pub enum GetBasinConfigError {
435    #[error(transparent)]
436    Storage(#[from] StorageError),
437    #[error(transparent)]
438    BasinNotFound(#[from] BasinNotFoundError),
439}
440
441#[derive(Debug, Clone, thiserror::Error)]
442pub enum ReconfigureBasinError {
443    #[error(transparent)]
444    Storage(#[from] StorageError),
445    #[error(transparent)]
446    TransactionConflict(#[from] TransactionConflictError),
447    #[error(transparent)]
448    BasinNotFound(#[from] BasinNotFoundError),
449    #[error(transparent)]
450    BasinDeletionPending(#[from] BasinDeletionPendingError),
451}
452
453impl From<slatedb::Error> for ReconfigureBasinError {
454    fn from(err: slatedb::Error) -> Self {
455        if err.kind() == slatedb::ErrorKind::Transaction {
456            Self::TransactionConflict(TransactionConflictError)
457        } else {
458            Self::Storage(err.into())
459        }
460    }
461}
462
463#[derive(Debug, Clone, thiserror::Error)]
464pub enum ReconfigureStreamError {
465    #[error(transparent)]
466    Storage(#[from] StorageError),
467    #[error(transparent)]
468    TransactionConflict(#[from] TransactionConflictError),
469    #[error(transparent)]
470    BasinNotFound(#[from] BasinNotFoundError),
471    #[error(transparent)]
472    BasinDeletionPending(#[from] BasinDeletionPendingError),
473    #[error(transparent)]
474    StreamNotFound(#[from] StreamNotFoundError),
475    #[error(transparent)]
476    StreamDeletionPending(#[from] StreamDeletionPendingError),
477    #[error(transparent)]
478    Validation(#[from] s2_common::types::ValidationError),
479}
480
481impl From<slatedb::Error> for ReconfigureStreamError {
482    fn from(err: slatedb::Error) -> Self {
483        if err.kind() == slatedb::ErrorKind::Transaction {
484            Self::TransactionConflict(TransactionConflictError)
485        } else {
486            Self::Storage(err.into())
487        }
488    }
489}
490
491#[derive(Debug, Clone, thiserror::Error)]
492pub enum DeleteBasinError {
493    #[error(transparent)]
494    Storage(#[from] StorageError),
495    #[error(transparent)]
496    TransactionConflict(#[from] TransactionConflictError),
497    #[error(transparent)]
498    BasinNotFound(#[from] BasinNotFoundError),
499}
500
501impl From<slatedb::Error> for DeleteBasinError {
502    fn from(err: slatedb::Error) -> Self {
503        if err.kind() == slatedb::ErrorKind::Transaction {
504            Self::TransactionConflict(TransactionConflictError)
505        } else {
506            Self::Storage(err.into())
507        }
508    }
509}