Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    encryption::EncryptionSpecResolutionError,
5    record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6    types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13    #[error("deserialization: {0}")]
14    Deserialization(#[from] kv::DeserializationError),
15    #[error("database: {0}")]
16    Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20    fn from(error: slatedb::Error) -> Self {
21        StorageError::Database(Arc::new(error))
22    }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28    pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34    pub basin: BasinName,
35    pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41    pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47    pub basin: BasinName,
48    pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54    pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60    pub basin: BasinName,
61    pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error(
82    "stream record limit exceeded: max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}"
83)]
84pub struct StreamRecordLimitExceededError {
85    pub first_seq_num: SeqNum,
86    pub assigned_seq_num: SeqNum,
87    pub max_assignable_seq_num: SeqNum,
88}
89
90#[derive(Debug, Clone, thiserror::Error)]
91#[error("transaction conflict occurred – this is usually retriable")]
92pub struct TransactionConflictError;
93
94#[derive(Debug, Clone, thiserror::Error)]
95pub enum StreamerError {
96    #[error(transparent)]
97    Storage(#[from] StorageError),
98    #[error(transparent)]
99    StreamNotFound(#[from] StreamNotFoundError),
100    #[error(transparent)]
101    StreamDeletionPending(#[from] StreamDeletionPendingError),
102}
103
104#[derive(Debug, Clone, thiserror::Error)]
105pub(super) enum AppendErrorInternal {
106    #[error(transparent)]
107    Storage(#[from] StorageError),
108    #[error(transparent)]
109    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
110    #[error(transparent)]
111    RequestDroppedError(#[from] RequestDroppedError),
112    #[error(transparent)]
113    ConditionFailed(#[from] AppendConditionFailedError),
114    #[error(transparent)]
115    TimestampMissing(#[from] AppendTimestampRequiredError),
116    #[error(transparent)]
117    StreamRecordLimitExceeded(#[from] StreamRecordLimitExceededError),
118}
119
120impl AppendErrorInternal {
121    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
122        match self {
123            Self::ConditionFailed(e) => e.durability_dependency(),
124            Self::StreamRecordLimitExceeded(e) => e.durability_dependency(),
125            _ => ..0,
126        }
127    }
128}
129
130#[derive(Debug, Clone, thiserror::Error)]
131pub enum CheckTailError {
132    #[error(transparent)]
133    Storage(#[from] StorageError),
134    #[error(transparent)]
135    TransactionConflict(#[from] TransactionConflictError),
136    #[error(transparent)]
137    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
138    #[error(transparent)]
139    BasinNotFound(#[from] BasinNotFoundError),
140    #[error(transparent)]
141    StreamNotFound(#[from] StreamNotFoundError),
142    #[error(transparent)]
143    BasinDeletionPending(#[from] BasinDeletionPendingError),
144    #[error(transparent)]
145    StreamDeletionPending(#[from] StreamDeletionPendingError),
146}
147
148impl From<StreamerError> for CheckTailError {
149    fn from(e: StreamerError) -> Self {
150        match e {
151            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
152            StreamerError::Storage(e) => Self::Storage(e),
153            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
154        }
155    }
156}
157
158#[derive(Debug, Clone, thiserror::Error)]
159pub enum AppendError {
160    #[error(transparent)]
161    Storage(#[from] StorageError),
162    #[error(transparent)]
163    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
164    #[error(transparent)]
165    TransactionConflict(#[from] TransactionConflictError),
166    #[error(transparent)]
167    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
168    #[error(transparent)]
169    RequestDroppedError(#[from] RequestDroppedError),
170    #[error(transparent)]
171    BasinNotFound(#[from] BasinNotFoundError),
172    #[error(transparent)]
173    StreamNotFound(#[from] StreamNotFoundError),
174    #[error(transparent)]
175    BasinDeletionPending(#[from] BasinDeletionPendingError),
176    #[error(transparent)]
177    StreamDeletionPending(#[from] StreamDeletionPendingError),
178    #[error(transparent)]
179    ConditionFailed(#[from] AppendConditionFailedError),
180    #[error(transparent)]
181    TimestampMissing(#[from] AppendTimestampRequiredError),
182    #[error(transparent)]
183    StreamRecordLimitExceeded(#[from] StreamRecordLimitExceededError),
184}
185
186impl From<AppendErrorInternal> for AppendError {
187    fn from(e: AppendErrorInternal) -> Self {
188        match e {
189            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
190            AppendErrorInternal::StreamerMissingInActionError(e) => {
191                AppendError::StreamerMissingInActionError(e)
192            }
193            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
194            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
195            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
196            AppendErrorInternal::StreamRecordLimitExceeded(e) => {
197                AppendError::StreamRecordLimitExceeded(e)
198            }
199        }
200    }
201}
202
203#[derive(Debug, Clone, thiserror::Error)]
204pub enum AppendConditionFailedError {
205    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
206    FencingTokenMismatch {
207        expected: FencingToken,
208        actual: FencingToken,
209        applied_point: RangeTo<SeqNum>,
210    },
211    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
212    SeqNumMismatch {
213        assigned_seq_num: SeqNum,
214        match_seq_num: SeqNum,
215    },
216}
217
218impl AppendConditionFailedError {
219    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
220        use AppendConditionFailedError::*;
221        match self {
222            SeqNumMismatch {
223                assigned_seq_num, ..
224            } => ..*assigned_seq_num,
225            FencingTokenMismatch { applied_point, .. } => *applied_point,
226        }
227    }
228}
229
230impl StreamRecordLimitExceededError {
231    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
232        ..self.first_seq_num
233    }
234}
235
236impl From<StreamerError> for AppendError {
237    fn from(e: StreamerError) -> Self {
238        match e {
239            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
240            StreamerError::Storage(e) => Self::Storage(e),
241            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
242        }
243    }
244}
245
246#[derive(Debug, Clone, thiserror::Error)]
247pub enum ReadError {
248    #[error(transparent)]
249    Storage(#[from] StorageError),
250    #[error(transparent)]
251    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
252    #[error(transparent)]
253    RecordDecryption(#[from] RecordDecryptionError),
254    #[error(transparent)]
255    TransactionConflict(#[from] TransactionConflictError),
256    #[error(transparent)]
257    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
258    #[error(transparent)]
259    BasinNotFound(#[from] BasinNotFoundError),
260    #[error(transparent)]
261    StreamNotFound(#[from] StreamNotFoundError),
262    #[error(transparent)]
263    BasinDeletionPending(#[from] BasinDeletionPendingError),
264    #[error(transparent)]
265    StreamDeletionPending(#[from] StreamDeletionPendingError),
266    #[error(transparent)]
267    Unwritten(#[from] UnwrittenError),
268}
269
270impl From<StreamerError> for ReadError {
271    fn from(e: StreamerError) -> Self {
272        match e {
273            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
274            StreamerError::Storage(e) => Self::Storage(e),
275            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
276        }
277    }
278}
279
280impl From<kv::DeserializationError> for ReadError {
281    fn from(e: kv::DeserializationError) -> Self {
282        Self::Storage(e.into())
283    }
284}
285
286impl From<slatedb::Error> for ReadError {
287    fn from(e: slatedb::Error) -> Self {
288        Self::Storage(e.into())
289    }
290}
291
292#[derive(Debug, Clone, thiserror::Error)]
293pub enum ListStreamsError {
294    #[error(transparent)]
295    Storage(#[from] StorageError),
296}
297
298impl From<slatedb::Error> for ListStreamsError {
299    fn from(e: slatedb::Error) -> Self {
300        Self::Storage(e.into())
301    }
302}
303
304impl From<kv::DeserializationError> for ListStreamsError {
305    fn from(e: kv::DeserializationError) -> Self {
306        Self::Storage(e.into())
307    }
308}
309
310#[derive(Debug, Clone, thiserror::Error)]
311pub enum CreateStreamError {
312    #[error(transparent)]
313    Storage(#[from] StorageError),
314    #[error(transparent)]
315    TransactionConflict(#[from] TransactionConflictError),
316    #[error(transparent)]
317    BasinNotFound(#[from] BasinNotFoundError),
318    #[error(transparent)]
319    BasinDeletionPending(#[from] BasinDeletionPendingError),
320    #[error(transparent)]
321    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
322    #[error(transparent)]
323    StreamDeletionPending(#[from] StreamDeletionPendingError),
324    #[error(transparent)]
325    Validation(#[from] s2_common::types::ValidationError),
326}
327
328impl From<slatedb::Error> for CreateStreamError {
329    fn from(err: slatedb::Error) -> Self {
330        if err.kind() == slatedb::ErrorKind::Transaction {
331            Self::TransactionConflict(TransactionConflictError)
332        } else {
333            Self::Storage(err.into())
334        }
335    }
336}
337
338impl From<GetBasinConfigError> for CreateStreamError {
339    fn from(err: GetBasinConfigError) -> Self {
340        match err {
341            GetBasinConfigError::Storage(e) => Self::Storage(e),
342            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
343        }
344    }
345}
346
347#[derive(Debug, Clone, thiserror::Error)]
348pub enum GetStreamConfigError {
349    #[error(transparent)]
350    Storage(#[from] StorageError),
351    #[error(transparent)]
352    StreamNotFound(#[from] StreamNotFoundError),
353    #[error(transparent)]
354    StreamDeletionPending(#[from] StreamDeletionPendingError),
355}
356
357#[derive(Debug, Clone, thiserror::Error)]
358pub enum DeleteStreamError {
359    #[error(transparent)]
360    Storage(#[from] StorageError),
361    #[error(transparent)]
362    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
363    #[error(transparent)]
364    RequestDroppedError(#[from] RequestDroppedError),
365    #[error(transparent)]
366    StreamNotFound(#[from] StreamNotFoundError),
367}
368
369impl From<slatedb::Error> for DeleteStreamError {
370    fn from(err: slatedb::Error) -> Self {
371        Self::Storage(err.into())
372    }
373}
374
375#[derive(Debug, Clone, thiserror::Error)]
376pub enum BasinDeletionError {
377    #[error(transparent)]
378    Storage(#[from] StorageError),
379    #[error(transparent)]
380    DeleteStream(#[from] DeleteStreamError),
381}
382
383#[derive(Debug, Clone, thiserror::Error)]
384pub enum StreamDeleteOnEmptyError {
385    #[error(transparent)]
386    Storage(#[from] StorageError),
387    #[error(transparent)]
388    DeleteStream(#[from] DeleteStreamError),
389}
390
391#[derive(Debug, Clone, thiserror::Error)]
392pub enum ListBasinsError {
393    #[error(transparent)]
394    Storage(#[from] StorageError),
395}
396
397impl From<slatedb::Error> for ListBasinsError {
398    fn from(err: slatedb::Error) -> Self {
399        Self::Storage(err.into())
400    }
401}
402
403impl From<kv::DeserializationError> for ListBasinsError {
404    fn from(e: kv::DeserializationError) -> Self {
405        Self::Storage(e.into())
406    }
407}
408
409#[derive(Debug, Clone, thiserror::Error)]
410pub enum CreateBasinError {
411    #[error(transparent)]
412    Storage(#[from] StorageError),
413    #[error(transparent)]
414    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
415    #[error(transparent)]
416    BasinDeletionPending(#[from] BasinDeletionPendingError),
417}
418
419impl From<slatedb::Error> for CreateBasinError {
420    fn from(err: slatedb::Error) -> Self {
421        Self::Storage(err.into())
422    }
423}
424
425#[derive(Debug, Clone, thiserror::Error)]
426pub enum GetBasinConfigError {
427    #[error(transparent)]
428    Storage(#[from] StorageError),
429    #[error(transparent)]
430    BasinNotFound(#[from] BasinNotFoundError),
431}
432
433#[derive(Debug, Clone, thiserror::Error)]
434pub enum ReconfigureBasinError {
435    #[error(transparent)]
436    Storage(#[from] StorageError),
437    #[error(transparent)]
438    TransactionConflict(#[from] TransactionConflictError),
439    #[error(transparent)]
440    BasinNotFound(#[from] BasinNotFoundError),
441    #[error(transparent)]
442    BasinDeletionPending(#[from] BasinDeletionPendingError),
443}
444
445impl From<slatedb::Error> for ReconfigureBasinError {
446    fn from(err: slatedb::Error) -> Self {
447        if err.kind() == slatedb::ErrorKind::Transaction {
448            Self::TransactionConflict(TransactionConflictError)
449        } else {
450            Self::Storage(err.into())
451        }
452    }
453}
454
455#[derive(Debug, Clone, thiserror::Error)]
456pub enum ReconfigureStreamError {
457    #[error(transparent)]
458    Storage(#[from] StorageError),
459    #[error(transparent)]
460    TransactionConflict(#[from] TransactionConflictError),
461    #[error(transparent)]
462    BasinNotFound(#[from] BasinNotFoundError),
463    #[error(transparent)]
464    StreamNotFound(#[from] StreamNotFoundError),
465    #[error(transparent)]
466    StreamDeletionPending(#[from] StreamDeletionPendingError),
467    #[error(transparent)]
468    Validation(#[from] s2_common::types::ValidationError),
469}
470
471impl From<slatedb::Error> for ReconfigureStreamError {
472    fn from(err: slatedb::Error) -> Self {
473        if err.kind() == slatedb::ErrorKind::Transaction {
474            Self::TransactionConflict(TransactionConflictError)
475        } else {
476            Self::Storage(err.into())
477        }
478    }
479}
480
481#[derive(Debug, Clone, thiserror::Error)]
482pub enum DeleteBasinError {
483    #[error(transparent)]
484    Storage(#[from] StorageError),
485    #[error(transparent)]
486    BasinNotFound(#[from] BasinNotFoundError),
487}
488
489impl From<slatedb::Error> for DeleteBasinError {
490    fn from(err: slatedb::Error) -> Self {
491        Self::Storage(err.into())
492    }
493}