Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    basin::BasinName,
5    encryption::EncryptionSpecResolutionError,
6    record::{FencingToken, SeqNum, StreamPosition},
7    stream::StreamName,
8};
9use s2_storage::record::RecordDecryptionError;
10
11use crate::backend::kv;
12
13#[derive(Debug, Clone, thiserror::Error)]
14pub enum StorageError {
15    #[error("deserialization: {0}")]
16    Deserialization(#[from] kv::DeserializationError),
17    #[error("database: {0}")]
18    Database(Arc<slatedb::Error>),
19}
20
21impl From<slatedb::Error> for StorageError {
22    fn from(error: slatedb::Error) -> Self {
23        StorageError::Database(Arc::new(error))
24    }
25}
26
27#[derive(Debug, Clone, thiserror::Error)]
28#[error("basin `{basin}` not found")]
29pub struct BasinNotFoundError {
30    pub basin: BasinName,
31}
32
33#[derive(Debug, Clone, thiserror::Error)]
34#[error("stream `{stream}` in basin `{basin}` not found")]
35pub struct StreamNotFoundError {
36    pub basin: BasinName,
37    pub stream: StreamName,
38}
39
40#[derive(Debug, Clone, thiserror::Error)]
41#[error("basin `{basin}` already exists")]
42pub struct BasinAlreadyExistsError {
43    pub basin: BasinName,
44}
45
46#[derive(Debug, Clone, thiserror::Error)]
47#[error("stream `{stream}` in basin `{basin}` already exists")]
48pub struct StreamAlreadyExistsError {
49    pub basin: BasinName,
50    pub stream: StreamName,
51}
52
53#[derive(Debug, Clone, thiserror::Error)]
54#[error("basin `{basin}` is being deleted")]
55pub struct BasinDeletionPendingError {
56    pub basin: BasinName,
57}
58
59#[derive(Debug, Clone, thiserror::Error)]
60#[error("stream deletion pending")]
61pub struct StreamDeletionPendingError;
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
81pub struct MaxSeqNumError {
82    pub first_seq_num: SeqNum,
83    pub assigned_seq_num: SeqNum,
84    pub max_assignable_seq_num: SeqNum,
85}
86
87#[derive(Debug, Clone, thiserror::Error)]
88#[error("transaction conflict occurred – this is usually retriable")]
89pub struct TransactionConflictError;
90
91#[derive(Debug, Clone, thiserror::Error)]
92pub enum StreamerError {
93    #[error(transparent)]
94    Storage(#[from] StorageError),
95    #[error(transparent)]
96    StreamNotFound(#[from] StreamNotFoundError),
97    #[error(transparent)]
98    StreamDeletionPending(#[from] StreamDeletionPendingError),
99}
100
101#[derive(Debug, Clone, thiserror::Error)]
102pub(super) enum AppendErrorInternal {
103    #[error(transparent)]
104    Storage(#[from] StorageError),
105    #[error(transparent)]
106    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
107    #[error(transparent)]
108    RequestDroppedError(#[from] RequestDroppedError),
109    #[error(transparent)]
110    StreamDeletionPending(#[from] StreamDeletionPendingError),
111    #[error(transparent)]
112    ConditionFailed(#[from] AppendConditionFailedError),
113    #[error(transparent)]
114    TimestampMissing(#[from] AppendTimestampRequiredError),
115    #[error(transparent)]
116    MaxSeqNum(#[from] MaxSeqNumError),
117}
118
119impl AppendErrorInternal {
120    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
121        match self {
122            Self::ConditionFailed(e) => e.durability_dependency(),
123            Self::MaxSeqNum(e) => e.durability_dependency(),
124            _ => ..0,
125        }
126    }
127}
128
129#[derive(Debug, Clone, thiserror::Error)]
130pub enum CheckTailError {
131    #[error(transparent)]
132    Storage(#[from] StorageError),
133    #[error(transparent)]
134    TransactionConflict(#[from] TransactionConflictError),
135    #[error(transparent)]
136    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
137    #[error(transparent)]
138    BasinNotFound(#[from] BasinNotFoundError),
139    #[error(transparent)]
140    StreamNotFound(#[from] StreamNotFoundError),
141    #[error(transparent)]
142    BasinDeletionPending(#[from] BasinDeletionPendingError),
143    #[error(transparent)]
144    StreamDeletionPending(#[from] StreamDeletionPendingError),
145}
146
147impl From<StreamerError> for CheckTailError {
148    fn from(e: StreamerError) -> Self {
149        match e {
150            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
151            StreamerError::Storage(e) => Self::Storage(e),
152            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
153        }
154    }
155}
156
157#[derive(Debug, Clone, thiserror::Error)]
158pub enum AppendError {
159    #[error(transparent)]
160    Storage(#[from] StorageError),
161    #[error(transparent)]
162    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
163    #[error(transparent)]
164    TransactionConflict(#[from] TransactionConflictError),
165    #[error(transparent)]
166    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
167    #[error(transparent)]
168    RequestDroppedError(#[from] RequestDroppedError),
169    #[error(transparent)]
170    BasinNotFound(#[from] BasinNotFoundError),
171    #[error(transparent)]
172    StreamNotFound(#[from] StreamNotFoundError),
173    #[error(transparent)]
174    BasinDeletionPending(#[from] BasinDeletionPendingError),
175    #[error(transparent)]
176    StreamDeletionPending(#[from] StreamDeletionPendingError),
177    #[error(transparent)]
178    ConditionFailed(#[from] AppendConditionFailedError),
179    #[error(transparent)]
180    TimestampMissing(#[from] AppendTimestampRequiredError),
181    #[error(transparent)]
182    MaxSeqNum(#[from] MaxSeqNumError),
183}
184
185impl From<AppendErrorInternal> for AppendError {
186    fn from(e: AppendErrorInternal) -> Self {
187        match e {
188            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
189            AppendErrorInternal::StreamerMissingInActionError(e) => {
190                AppendError::StreamerMissingInActionError(e)
191            }
192            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
193            AppendErrorInternal::StreamDeletionPending(e) => AppendError::StreamDeletionPending(e),
194            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
195            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
196            AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
197        }
198    }
199}
200
201#[derive(Debug, Clone, thiserror::Error)]
202pub enum AppendConditionFailedError {
203    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
204    FencingTokenMismatch {
205        expected: FencingToken,
206        actual: FencingToken,
207        applied_point: RangeTo<SeqNum>,
208    },
209    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
210    SeqNumMismatch {
211        assigned_seq_num: SeqNum,
212        match_seq_num: SeqNum,
213    },
214}
215
216impl AppendConditionFailedError {
217    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
218        use AppendConditionFailedError::*;
219        match self {
220            SeqNumMismatch {
221                assigned_seq_num, ..
222            } => ..*assigned_seq_num,
223            FencingTokenMismatch { applied_point, .. } => *applied_point,
224        }
225    }
226}
227
228impl MaxSeqNumError {
229    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
230        ..self.first_seq_num
231    }
232}
233
234impl From<StreamerError> for AppendError {
235    fn from(e: StreamerError) -> Self {
236        match e {
237            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
238            StreamerError::Storage(e) => Self::Storage(e),
239            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
240        }
241    }
242}
243
244#[derive(Debug, Clone, thiserror::Error)]
245pub enum ReadError {
246    #[error(transparent)]
247    Storage(#[from] StorageError),
248    #[error(transparent)]
249    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
250    #[error(transparent)]
251    RecordDecryption(#[from] RecordDecryptionError),
252    #[error(transparent)]
253    TransactionConflict(#[from] TransactionConflictError),
254    #[error(transparent)]
255    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
256    #[error(transparent)]
257    BasinNotFound(#[from] BasinNotFoundError),
258    #[error(transparent)]
259    StreamNotFound(#[from] StreamNotFoundError),
260    #[error(transparent)]
261    BasinDeletionPending(#[from] BasinDeletionPendingError),
262    #[error(transparent)]
263    StreamDeletionPending(#[from] StreamDeletionPendingError),
264    #[error(transparent)]
265    Unwritten(#[from] UnwrittenError),
266}
267
268impl From<StreamerError> for ReadError {
269    fn from(e: StreamerError) -> Self {
270        match e {
271            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
272            StreamerError::Storage(e) => Self::Storage(e),
273            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
274        }
275    }
276}
277
278impl From<kv::DeserializationError> for ReadError {
279    fn from(e: kv::DeserializationError) -> Self {
280        Self::Storage(e.into())
281    }
282}
283
284impl From<slatedb::Error> for ReadError {
285    fn from(e: slatedb::Error) -> Self {
286        Self::Storage(e.into())
287    }
288}
289
290#[derive(Debug, Clone, thiserror::Error)]
291pub enum ListStreamsError {
292    #[error(transparent)]
293    Storage(#[from] StorageError),
294}
295
296impl From<slatedb::Error> for ListStreamsError {
297    fn from(e: slatedb::Error) -> Self {
298        Self::Storage(e.into())
299    }
300}
301
302impl From<kv::DeserializationError> for ListStreamsError {
303    fn from(e: kv::DeserializationError) -> Self {
304        Self::Storage(e.into())
305    }
306}
307
308#[derive(Debug, Clone, thiserror::Error)]
309pub enum ProvisionStreamError {
310    #[error(transparent)]
311    Storage(#[from] StorageError),
312    #[error(transparent)]
313    TransactionConflict(#[from] TransactionConflictError),
314    #[error(transparent)]
315    BasinNotFound(#[from] BasinNotFoundError),
316    #[error(transparent)]
317    BasinDeletionPending(#[from] BasinDeletionPendingError),
318    #[error(transparent)]
319    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
320    #[error(transparent)]
321    StreamDeletionPending(#[from] StreamDeletionPendingError),
322    #[error(transparent)]
323    Validation(#[from] s2_common::ValidationError),
324}
325
326impl From<slatedb::Error> for ProvisionStreamError {
327    fn from(err: slatedb::Error) -> Self {
328        if err.kind() == slatedb::ErrorKind::Transaction {
329            Self::TransactionConflict(TransactionConflictError)
330        } else {
331            Self::Storage(err.into())
332        }
333    }
334}
335
336impl From<GetBasinConfigError> for ProvisionStreamError {
337    fn from(err: GetBasinConfigError) -> Self {
338        match err {
339            GetBasinConfigError::Storage(e) => Self::Storage(e),
340            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
341        }
342    }
343}
344
345#[derive(Debug, Clone, thiserror::Error)]
346pub enum GetStreamConfigError {
347    #[error(transparent)]
348    Storage(#[from] StorageError),
349    #[error(transparent)]
350    StreamNotFound(#[from] StreamNotFoundError),
351    #[error(transparent)]
352    StreamDeletionPending(#[from] StreamDeletionPendingError),
353}
354
355#[derive(Debug, Clone, thiserror::Error)]
356pub enum DeleteStreamError {
357    #[error(transparent)]
358    Storage(#[from] StorageError),
359    #[error(transparent)]
360    TransactionConflict(#[from] TransactionConflictError),
361    #[error(transparent)]
362    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
363    #[error(transparent)]
364    RequestDroppedError(#[from] RequestDroppedError),
365    #[error(transparent)]
366    StreamNotFound(#[from] StreamNotFoundError),
367}
368
369impl From<slatedb::Error> for DeleteStreamError {
370    fn from(err: slatedb::Error) -> Self {
371        if err.kind() == slatedb::ErrorKind::Transaction {
372            Self::TransactionConflict(TransactionConflictError)
373        } else {
374            Self::Storage(err.into())
375        }
376    }
377}
378
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum BasinDeletionError {
381    #[error(transparent)]
382    Storage(#[from] StorageError),
383    #[error(transparent)]
384    DeleteStream(#[from] DeleteStreamError),
385}
386
387#[derive(Debug, Clone, thiserror::Error)]
388pub enum StreamDeleteOnEmptyError {
389    #[error(transparent)]
390    Storage(#[from] StorageError),
391    #[error(transparent)]
392    DeleteStream(#[from] DeleteStreamError),
393}
394
395#[derive(Debug, Clone, thiserror::Error)]
396pub enum ListBasinsError {
397    #[error(transparent)]
398    Storage(#[from] StorageError),
399}
400
401impl From<slatedb::Error> for ListBasinsError {
402    fn from(err: slatedb::Error) -> Self {
403        Self::Storage(err.into())
404    }
405}
406
407impl From<kv::DeserializationError> for ListBasinsError {
408    fn from(e: kv::DeserializationError) -> Self {
409        Self::Storage(e.into())
410    }
411}
412
413#[derive(Debug, Clone, thiserror::Error)]
414pub enum ProvisionBasinError {
415    #[error(transparent)]
416    Storage(#[from] StorageError),
417    #[error(transparent)]
418    TransactionConflict(#[from] TransactionConflictError),
419    #[error(transparent)]
420    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
421    #[error(transparent)]
422    BasinDeletionPending(#[from] BasinDeletionPendingError),
423}
424
425impl From<slatedb::Error> for ProvisionBasinError {
426    fn from(err: slatedb::Error) -> Self {
427        if err.kind() == slatedb::ErrorKind::Transaction {
428            Self::TransactionConflict(TransactionConflictError)
429        } else {
430            Self::Storage(err.into())
431        }
432    }
433}
434
435#[derive(Debug, Clone, thiserror::Error)]
436pub enum GetBasinConfigError {
437    #[error(transparent)]
438    Storage(#[from] StorageError),
439    #[error(transparent)]
440    BasinNotFound(#[from] BasinNotFoundError),
441}
442
443#[derive(Debug, Clone, thiserror::Error)]
444pub enum ReconfigureBasinError {
445    #[error(transparent)]
446    Storage(#[from] StorageError),
447    #[error(transparent)]
448    TransactionConflict(#[from] TransactionConflictError),
449    #[error(transparent)]
450    BasinNotFound(#[from] BasinNotFoundError),
451    #[error(transparent)]
452    BasinDeletionPending(#[from] BasinDeletionPendingError),
453}
454
455impl From<slatedb::Error> for ReconfigureBasinError {
456    fn from(err: slatedb::Error) -> Self {
457        if err.kind() == slatedb::ErrorKind::Transaction {
458            Self::TransactionConflict(TransactionConflictError)
459        } else {
460            Self::Storage(err.into())
461        }
462    }
463}
464
465#[derive(Debug, Clone, thiserror::Error)]
466pub enum ReconfigureStreamError {
467    #[error(transparent)]
468    Storage(#[from] StorageError),
469    #[error(transparent)]
470    TransactionConflict(#[from] TransactionConflictError),
471    #[error(transparent)]
472    BasinNotFound(#[from] BasinNotFoundError),
473    #[error(transparent)]
474    BasinDeletionPending(#[from] BasinDeletionPendingError),
475    #[error(transparent)]
476    StreamNotFound(#[from] StreamNotFoundError),
477    #[error(transparent)]
478    StreamDeletionPending(#[from] StreamDeletionPendingError),
479    #[error(transparent)]
480    Validation(#[from] s2_common::ValidationError),
481}
482
483impl From<slatedb::Error> for ReconfigureStreamError {
484    fn from(err: slatedb::Error) -> Self {
485        if err.kind() == slatedb::ErrorKind::Transaction {
486            Self::TransactionConflict(TransactionConflictError)
487        } else {
488            Self::Storage(err.into())
489        }
490    }
491}
492
493#[derive(Debug, Clone, thiserror::Error)]
494pub enum DeleteBasinError {
495    #[error(transparent)]
496    Storage(#[from] StorageError),
497    #[error(transparent)]
498    TransactionConflict(#[from] TransactionConflictError),
499    #[error(transparent)]
500    BasinNotFound(#[from] BasinNotFoundError),
501}
502
503impl From<slatedb::Error> for DeleteBasinError {
504    fn from(err: slatedb::Error) -> Self {
505        if err.kind() == slatedb::ErrorKind::Transaction {
506            Self::TransactionConflict(TransactionConflictError)
507        } else {
508            Self::Storage(err.into())
509        }
510    }
511}