Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    encryption::EncryptionSpecResolutionError,
5    record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6    types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13    #[error("deserialization: {0}")]
14    Deserialization(#[from] kv::DeserializationError),
15    #[error("database: {0}")]
16    Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20    fn from(error: slatedb::Error) -> Self {
21        StorageError::Database(Arc::new(error))
22    }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28    pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34    pub basin: BasinName,
35    pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41    pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47    pub basin: BasinName,
48    pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54    pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60    pub basin: BasinName,
61    pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
82pub struct MaxSeqNumError {
83    pub first_seq_num: SeqNum,
84    pub assigned_seq_num: SeqNum,
85    pub max_assignable_seq_num: SeqNum,
86}
87
88#[derive(Debug, Clone, thiserror::Error)]
89#[error("transaction conflict occurred – this is usually retriable")]
90pub struct TransactionConflictError;
91
92#[derive(Debug, Clone, thiserror::Error)]
93pub enum StreamerError {
94    #[error(transparent)]
95    Storage(#[from] StorageError),
96    #[error(transparent)]
97    StreamNotFound(#[from] StreamNotFoundError),
98    #[error(transparent)]
99    StreamDeletionPending(#[from] StreamDeletionPendingError),
100}
101
102#[derive(Debug, Clone, thiserror::Error)]
103pub(super) enum AppendErrorInternal {
104    #[error(transparent)]
105    Storage(#[from] StorageError),
106    #[error(transparent)]
107    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
108    #[error(transparent)]
109    RequestDroppedError(#[from] RequestDroppedError),
110    #[error(transparent)]
111    ConditionFailed(#[from] AppendConditionFailedError),
112    #[error(transparent)]
113    TimestampMissing(#[from] AppendTimestampRequiredError),
114    #[error(transparent)]
115    MaxSeqNum(#[from] MaxSeqNumError),
116}
117
118impl AppendErrorInternal {
119    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
120        match self {
121            Self::ConditionFailed(e) => e.durability_dependency(),
122            Self::MaxSeqNum(e) => e.durability_dependency(),
123            _ => ..0,
124        }
125    }
126}
127
128#[derive(Debug, Clone, thiserror::Error)]
129pub enum CheckTailError {
130    #[error(transparent)]
131    Storage(#[from] StorageError),
132    #[error(transparent)]
133    TransactionConflict(#[from] TransactionConflictError),
134    #[error(transparent)]
135    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
136    #[error(transparent)]
137    BasinNotFound(#[from] BasinNotFoundError),
138    #[error(transparent)]
139    StreamNotFound(#[from] StreamNotFoundError),
140    #[error(transparent)]
141    BasinDeletionPending(#[from] BasinDeletionPendingError),
142    #[error(transparent)]
143    StreamDeletionPending(#[from] StreamDeletionPendingError),
144}
145
146impl From<StreamerError> for CheckTailError {
147    fn from(e: StreamerError) -> Self {
148        match e {
149            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
150            StreamerError::Storage(e) => Self::Storage(e),
151            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
152        }
153    }
154}
155
156#[derive(Debug, Clone, thiserror::Error)]
157pub enum AppendError {
158    #[error(transparent)]
159    Storage(#[from] StorageError),
160    #[error(transparent)]
161    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
162    #[error(transparent)]
163    TransactionConflict(#[from] TransactionConflictError),
164    #[error(transparent)]
165    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
166    #[error(transparent)]
167    RequestDroppedError(#[from] RequestDroppedError),
168    #[error(transparent)]
169    BasinNotFound(#[from] BasinNotFoundError),
170    #[error(transparent)]
171    StreamNotFound(#[from] StreamNotFoundError),
172    #[error(transparent)]
173    BasinDeletionPending(#[from] BasinDeletionPendingError),
174    #[error(transparent)]
175    StreamDeletionPending(#[from] StreamDeletionPendingError),
176    #[error(transparent)]
177    ConditionFailed(#[from] AppendConditionFailedError),
178    #[error(transparent)]
179    TimestampMissing(#[from] AppendTimestampRequiredError),
180    #[error(transparent)]
181    MaxSeqNum(#[from] MaxSeqNumError),
182}
183
184impl From<AppendErrorInternal> for AppendError {
185    fn from(e: AppendErrorInternal) -> Self {
186        match e {
187            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
188            AppendErrorInternal::StreamerMissingInActionError(e) => {
189                AppendError::StreamerMissingInActionError(e)
190            }
191            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
192            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
193            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
194            AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
195        }
196    }
197}
198
199#[derive(Debug, Clone, thiserror::Error)]
200pub enum AppendConditionFailedError {
201    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
202    FencingTokenMismatch {
203        expected: FencingToken,
204        actual: FencingToken,
205        applied_point: RangeTo<SeqNum>,
206    },
207    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
208    SeqNumMismatch {
209        assigned_seq_num: SeqNum,
210        match_seq_num: SeqNum,
211    },
212}
213
214impl AppendConditionFailedError {
215    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
216        use AppendConditionFailedError::*;
217        match self {
218            SeqNumMismatch {
219                assigned_seq_num, ..
220            } => ..*assigned_seq_num,
221            FencingTokenMismatch { applied_point, .. } => *applied_point,
222        }
223    }
224}
225
226impl MaxSeqNumError {
227    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
228        ..self.first_seq_num
229    }
230}
231
232impl From<StreamerError> for AppendError {
233    fn from(e: StreamerError) -> Self {
234        match e {
235            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
236            StreamerError::Storage(e) => Self::Storage(e),
237            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
238        }
239    }
240}
241
242#[derive(Debug, Clone, thiserror::Error)]
243pub enum ReadError {
244    #[error(transparent)]
245    Storage(#[from] StorageError),
246    #[error(transparent)]
247    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
248    #[error(transparent)]
249    RecordDecryption(#[from] RecordDecryptionError),
250    #[error(transparent)]
251    TransactionConflict(#[from] TransactionConflictError),
252    #[error(transparent)]
253    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
254    #[error(transparent)]
255    BasinNotFound(#[from] BasinNotFoundError),
256    #[error(transparent)]
257    StreamNotFound(#[from] StreamNotFoundError),
258    #[error(transparent)]
259    BasinDeletionPending(#[from] BasinDeletionPendingError),
260    #[error(transparent)]
261    StreamDeletionPending(#[from] StreamDeletionPendingError),
262    #[error(transparent)]
263    Unwritten(#[from] UnwrittenError),
264}
265
266impl From<StreamerError> for ReadError {
267    fn from(e: StreamerError) -> Self {
268        match e {
269            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
270            StreamerError::Storage(e) => Self::Storage(e),
271            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
272        }
273    }
274}
275
276impl From<kv::DeserializationError> for ReadError {
277    fn from(e: kv::DeserializationError) -> Self {
278        Self::Storage(e.into())
279    }
280}
281
282impl From<slatedb::Error> for ReadError {
283    fn from(e: slatedb::Error) -> Self {
284        Self::Storage(e.into())
285    }
286}
287
288#[derive(Debug, Clone, thiserror::Error)]
289pub enum ListStreamsError {
290    #[error(transparent)]
291    Storage(#[from] StorageError),
292}
293
294impl From<slatedb::Error> for ListStreamsError {
295    fn from(e: slatedb::Error) -> Self {
296        Self::Storage(e.into())
297    }
298}
299
300impl From<kv::DeserializationError> for ListStreamsError {
301    fn from(e: kv::DeserializationError) -> Self {
302        Self::Storage(e.into())
303    }
304}
305
306#[derive(Debug, Clone, thiserror::Error)]
307pub enum CreateStreamError {
308    #[error(transparent)]
309    Storage(#[from] StorageError),
310    #[error(transparent)]
311    TransactionConflict(#[from] TransactionConflictError),
312    #[error(transparent)]
313    BasinNotFound(#[from] BasinNotFoundError),
314    #[error(transparent)]
315    BasinDeletionPending(#[from] BasinDeletionPendingError),
316    #[error(transparent)]
317    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
318    #[error(transparent)]
319    StreamDeletionPending(#[from] StreamDeletionPendingError),
320    #[error(transparent)]
321    Validation(#[from] s2_common::types::ValidationError),
322}
323
324impl From<slatedb::Error> for CreateStreamError {
325    fn from(err: slatedb::Error) -> Self {
326        if err.kind() == slatedb::ErrorKind::Transaction {
327            Self::TransactionConflict(TransactionConflictError)
328        } else {
329            Self::Storage(err.into())
330        }
331    }
332}
333
334impl From<GetBasinConfigError> for CreateStreamError {
335    fn from(err: GetBasinConfigError) -> Self {
336        match err {
337            GetBasinConfigError::Storage(e) => Self::Storage(e),
338            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
339        }
340    }
341}
342
343#[derive(Debug, Clone, thiserror::Error)]
344pub enum GetStreamConfigError {
345    #[error(transparent)]
346    Storage(#[from] StorageError),
347    #[error(transparent)]
348    StreamNotFound(#[from] StreamNotFoundError),
349    #[error(transparent)]
350    StreamDeletionPending(#[from] StreamDeletionPendingError),
351}
352
353#[derive(Debug, Clone, thiserror::Error)]
354pub enum DeleteStreamError {
355    #[error(transparent)]
356    Storage(#[from] StorageError),
357    #[error(transparent)]
358    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
359    #[error(transparent)]
360    RequestDroppedError(#[from] RequestDroppedError),
361    #[error(transparent)]
362    StreamNotFound(#[from] StreamNotFoundError),
363}
364
365impl From<slatedb::Error> for DeleteStreamError {
366    fn from(err: slatedb::Error) -> Self {
367        Self::Storage(err.into())
368    }
369}
370
371#[derive(Debug, Clone, thiserror::Error)]
372pub enum BasinDeletionError {
373    #[error(transparent)]
374    Storage(#[from] StorageError),
375    #[error(transparent)]
376    DeleteStream(#[from] DeleteStreamError),
377}
378
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum StreamDeleteOnEmptyError {
381    #[error(transparent)]
382    Storage(#[from] StorageError),
383    #[error(transparent)]
384    DeleteStream(#[from] DeleteStreamError),
385}
386
387#[derive(Debug, Clone, thiserror::Error)]
388pub enum ListBasinsError {
389    #[error(transparent)]
390    Storage(#[from] StorageError),
391}
392
393impl From<slatedb::Error> for ListBasinsError {
394    fn from(err: slatedb::Error) -> Self {
395        Self::Storage(err.into())
396    }
397}
398
399impl From<kv::DeserializationError> for ListBasinsError {
400    fn from(e: kv::DeserializationError) -> Self {
401        Self::Storage(e.into())
402    }
403}
404
405#[derive(Debug, Clone, thiserror::Error)]
406pub enum CreateBasinError {
407    #[error(transparent)]
408    Storage(#[from] StorageError),
409    #[error(transparent)]
410    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
411    #[error(transparent)]
412    BasinDeletionPending(#[from] BasinDeletionPendingError),
413}
414
415impl From<slatedb::Error> for CreateBasinError {
416    fn from(err: slatedb::Error) -> Self {
417        Self::Storage(err.into())
418    }
419}
420
421#[derive(Debug, Clone, thiserror::Error)]
422pub enum GetBasinConfigError {
423    #[error(transparent)]
424    Storage(#[from] StorageError),
425    #[error(transparent)]
426    BasinNotFound(#[from] BasinNotFoundError),
427}
428
429#[derive(Debug, Clone, thiserror::Error)]
430pub enum ReconfigureBasinError {
431    #[error(transparent)]
432    Storage(#[from] StorageError),
433    #[error(transparent)]
434    TransactionConflict(#[from] TransactionConflictError),
435    #[error(transparent)]
436    BasinNotFound(#[from] BasinNotFoundError),
437    #[error(transparent)]
438    BasinDeletionPending(#[from] BasinDeletionPendingError),
439}
440
441impl From<slatedb::Error> for ReconfigureBasinError {
442    fn from(err: slatedb::Error) -> Self {
443        if err.kind() == slatedb::ErrorKind::Transaction {
444            Self::TransactionConflict(TransactionConflictError)
445        } else {
446            Self::Storage(err.into())
447        }
448    }
449}
450
451#[derive(Debug, Clone, thiserror::Error)]
452pub enum ReconfigureStreamError {
453    #[error(transparent)]
454    Storage(#[from] StorageError),
455    #[error(transparent)]
456    TransactionConflict(#[from] TransactionConflictError),
457    #[error(transparent)]
458    BasinNotFound(#[from] BasinNotFoundError),
459    #[error(transparent)]
460    StreamNotFound(#[from] StreamNotFoundError),
461    #[error(transparent)]
462    StreamDeletionPending(#[from] StreamDeletionPendingError),
463    #[error(transparent)]
464    Validation(#[from] s2_common::types::ValidationError),
465}
466
467impl From<slatedb::Error> for ReconfigureStreamError {
468    fn from(err: slatedb::Error) -> Self {
469        if err.kind() == slatedb::ErrorKind::Transaction {
470            Self::TransactionConflict(TransactionConflictError)
471        } else {
472            Self::Storage(err.into())
473        }
474    }
475}
476
477#[derive(Debug, Clone, thiserror::Error)]
478pub enum DeleteBasinError {
479    #[error(transparent)]
480    Storage(#[from] StorageError),
481    #[error(transparent)]
482    BasinNotFound(#[from] BasinNotFoundError),
483}
484
485impl From<slatedb::Error> for DeleteBasinError {
486    fn from(err: slatedb::Error) -> Self {
487        Self::Storage(err.into())
488    }
489}