Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    encryption::EncryptionSpecResolutionError,
5    record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6    types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13    #[error("deserialization: {0}")]
14    Deserialization(#[from] kv::DeserializationError),
15    #[error("database: {0}")]
16    Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20    fn from(error: slatedb::Error) -> Self {
21        StorageError::Database(Arc::new(error))
22    }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28    pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34    pub basin: BasinName,
35    pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41    pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47    pub basin: BasinName,
48    pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54    pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60    pub basin: BasinName,
61    pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error(
82    "stream encrypted record limit exceeded: records must be assigned sequence numbers below {limit}; attempted {assigned_seq_num}"
83)]
84pub struct StreamEncryptedRecordLimitExceededError {
85    pub assigned_seq_num: SeqNum,
86    pub limit: SeqNum,
87}
88
89#[derive(Debug, Clone, thiserror::Error)]
90#[error("transaction conflict occurred – this is usually retriable")]
91pub struct TransactionConflictError;
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub enum StreamerError {
95    #[error(transparent)]
96    Storage(#[from] StorageError),
97    #[error(transparent)]
98    StreamNotFound(#[from] StreamNotFoundError),
99    #[error(transparent)]
100    StreamDeletionPending(#[from] StreamDeletionPendingError),
101}
102
103#[derive(Debug, Clone, thiserror::Error)]
104pub(super) enum AppendErrorInternal {
105    #[error(transparent)]
106    Storage(#[from] StorageError),
107    #[error(transparent)]
108    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
109    #[error(transparent)]
110    RequestDroppedError(#[from] RequestDroppedError),
111    #[error(transparent)]
112    ConditionFailed(#[from] AppendConditionFailedError),
113    #[error(transparent)]
114    TimestampMissing(#[from] AppendTimestampRequiredError),
115    #[error(transparent)]
116    StreamEncryptedRecordLimitExceeded(#[from] StreamEncryptedRecordLimitExceededError),
117}
118
119#[derive(Debug, Clone, thiserror::Error)]
120pub enum CheckTailError {
121    #[error(transparent)]
122    Storage(#[from] StorageError),
123    #[error(transparent)]
124    TransactionConflict(#[from] TransactionConflictError),
125    #[error(transparent)]
126    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
127    #[error(transparent)]
128    BasinNotFound(#[from] BasinNotFoundError),
129    #[error(transparent)]
130    StreamNotFound(#[from] StreamNotFoundError),
131    #[error(transparent)]
132    BasinDeletionPending(#[from] BasinDeletionPendingError),
133    #[error(transparent)]
134    StreamDeletionPending(#[from] StreamDeletionPendingError),
135}
136
137impl From<StreamerError> for CheckTailError {
138    fn from(e: StreamerError) -> Self {
139        match e {
140            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
141            StreamerError::Storage(e) => Self::Storage(e),
142            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
143        }
144    }
145}
146
147#[derive(Debug, Clone, thiserror::Error)]
148pub enum AppendError {
149    #[error(transparent)]
150    Storage(#[from] StorageError),
151    #[error(transparent)]
152    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
153    #[error(transparent)]
154    TransactionConflict(#[from] TransactionConflictError),
155    #[error(transparent)]
156    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
157    #[error(transparent)]
158    RequestDroppedError(#[from] RequestDroppedError),
159    #[error(transparent)]
160    BasinNotFound(#[from] BasinNotFoundError),
161    #[error(transparent)]
162    StreamNotFound(#[from] StreamNotFoundError),
163    #[error(transparent)]
164    BasinDeletionPending(#[from] BasinDeletionPendingError),
165    #[error(transparent)]
166    StreamDeletionPending(#[from] StreamDeletionPendingError),
167    #[error(transparent)]
168    ConditionFailed(#[from] AppendConditionFailedError),
169    #[error(transparent)]
170    TimestampMissing(#[from] AppendTimestampRequiredError),
171    #[error(transparent)]
172    StreamEncryptedRecordLimitExceeded(#[from] StreamEncryptedRecordLimitExceededError),
173}
174
175impl From<AppendErrorInternal> for AppendError {
176    fn from(e: AppendErrorInternal) -> Self {
177        match e {
178            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
179            AppendErrorInternal::StreamerMissingInActionError(e) => {
180                AppendError::StreamerMissingInActionError(e)
181            }
182            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
183            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
184            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
185            AppendErrorInternal::StreamEncryptedRecordLimitExceeded(e) => {
186                AppendError::StreamEncryptedRecordLimitExceeded(e)
187            }
188        }
189    }
190}
191
192#[derive(Debug, Clone, thiserror::Error)]
193pub enum AppendConditionFailedError {
194    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
195    FencingTokenMismatch {
196        expected: FencingToken,
197        actual: FencingToken,
198        applied_point: RangeTo<SeqNum>,
199    },
200    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
201    SeqNumMismatch {
202        assigned_seq_num: SeqNum,
203        match_seq_num: SeqNum,
204    },
205}
206
207impl AppendConditionFailedError {
208    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
209        use AppendConditionFailedError::*;
210        match self {
211            SeqNumMismatch {
212                assigned_seq_num, ..
213            } => ..*assigned_seq_num,
214            FencingTokenMismatch { applied_point, .. } => *applied_point,
215        }
216    }
217}
218
219impl From<StreamerError> for AppendError {
220    fn from(e: StreamerError) -> Self {
221        match e {
222            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
223            StreamerError::Storage(e) => Self::Storage(e),
224            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
225        }
226    }
227}
228
229#[derive(Debug, Clone, thiserror::Error)]
230pub enum ReadError {
231    #[error(transparent)]
232    Storage(#[from] StorageError),
233    #[error(transparent)]
234    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
235    #[error(transparent)]
236    RecordDecryption(#[from] RecordDecryptionError),
237    #[error(transparent)]
238    TransactionConflict(#[from] TransactionConflictError),
239    #[error(transparent)]
240    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
241    #[error(transparent)]
242    BasinNotFound(#[from] BasinNotFoundError),
243    #[error(transparent)]
244    StreamNotFound(#[from] StreamNotFoundError),
245    #[error(transparent)]
246    BasinDeletionPending(#[from] BasinDeletionPendingError),
247    #[error(transparent)]
248    StreamDeletionPending(#[from] StreamDeletionPendingError),
249    #[error(transparent)]
250    Unwritten(#[from] UnwrittenError),
251}
252
253impl From<StreamerError> for ReadError {
254    fn from(e: StreamerError) -> Self {
255        match e {
256            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
257            StreamerError::Storage(e) => Self::Storage(e),
258            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
259        }
260    }
261}
262
263impl From<kv::DeserializationError> for ReadError {
264    fn from(e: kv::DeserializationError) -> Self {
265        Self::Storage(e.into())
266    }
267}
268
269impl From<slatedb::Error> for ReadError {
270    fn from(e: slatedb::Error) -> Self {
271        Self::Storage(e.into())
272    }
273}
274
275#[derive(Debug, Clone, thiserror::Error)]
276pub enum ListStreamsError {
277    #[error(transparent)]
278    Storage(#[from] StorageError),
279}
280
281impl From<slatedb::Error> for ListStreamsError {
282    fn from(e: slatedb::Error) -> Self {
283        Self::Storage(e.into())
284    }
285}
286
287impl From<kv::DeserializationError> for ListStreamsError {
288    fn from(e: kv::DeserializationError) -> Self {
289        Self::Storage(e.into())
290    }
291}
292
293#[derive(Debug, Clone, thiserror::Error)]
294pub enum CreateStreamError {
295    #[error(transparent)]
296    Storage(#[from] StorageError),
297    #[error(transparent)]
298    TransactionConflict(#[from] TransactionConflictError),
299    #[error(transparent)]
300    BasinNotFound(#[from] BasinNotFoundError),
301    #[error(transparent)]
302    BasinDeletionPending(#[from] BasinDeletionPendingError),
303    #[error(transparent)]
304    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
305    #[error(transparent)]
306    StreamDeletionPending(#[from] StreamDeletionPendingError),
307    #[error(transparent)]
308    Validation(#[from] s2_common::types::ValidationError),
309}
310
311impl From<slatedb::Error> for CreateStreamError {
312    fn from(err: slatedb::Error) -> Self {
313        if err.kind() == slatedb::ErrorKind::Transaction {
314            Self::TransactionConflict(TransactionConflictError)
315        } else {
316            Self::Storage(err.into())
317        }
318    }
319}
320
321impl From<GetBasinConfigError> for CreateStreamError {
322    fn from(err: GetBasinConfigError) -> Self {
323        match err {
324            GetBasinConfigError::Storage(e) => Self::Storage(e),
325            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
326        }
327    }
328}
329
330#[derive(Debug, Clone, thiserror::Error)]
331pub enum GetStreamConfigError {
332    #[error(transparent)]
333    Storage(#[from] StorageError),
334    #[error(transparent)]
335    StreamNotFound(#[from] StreamNotFoundError),
336    #[error(transparent)]
337    StreamDeletionPending(#[from] StreamDeletionPendingError),
338}
339
340#[derive(Debug, Clone, thiserror::Error)]
341pub enum DeleteStreamError {
342    #[error(transparent)]
343    Storage(#[from] StorageError),
344    #[error(transparent)]
345    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
346    #[error(transparent)]
347    RequestDroppedError(#[from] RequestDroppedError),
348    #[error(transparent)]
349    StreamNotFound(#[from] StreamNotFoundError),
350}
351
352impl From<slatedb::Error> for DeleteStreamError {
353    fn from(err: slatedb::Error) -> Self {
354        Self::Storage(err.into())
355    }
356}
357
358#[derive(Debug, Clone, thiserror::Error)]
359pub enum BasinDeletionError {
360    #[error(transparent)]
361    Storage(#[from] StorageError),
362    #[error(transparent)]
363    DeleteStream(#[from] DeleteStreamError),
364}
365
366#[derive(Debug, Clone, thiserror::Error)]
367pub enum StreamDeleteOnEmptyError {
368    #[error(transparent)]
369    Storage(#[from] StorageError),
370    #[error(transparent)]
371    DeleteStream(#[from] DeleteStreamError),
372}
373
374#[derive(Debug, Clone, thiserror::Error)]
375pub enum ListBasinsError {
376    #[error(transparent)]
377    Storage(#[from] StorageError),
378}
379
380impl From<slatedb::Error> for ListBasinsError {
381    fn from(err: slatedb::Error) -> Self {
382        Self::Storage(err.into())
383    }
384}
385
386impl From<kv::DeserializationError> for ListBasinsError {
387    fn from(e: kv::DeserializationError) -> Self {
388        Self::Storage(e.into())
389    }
390}
391
392#[derive(Debug, Clone, thiserror::Error)]
393pub enum CreateBasinError {
394    #[error(transparent)]
395    Storage(#[from] StorageError),
396    #[error(transparent)]
397    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
398    #[error(transparent)]
399    BasinDeletionPending(#[from] BasinDeletionPendingError),
400}
401
402impl From<slatedb::Error> for CreateBasinError {
403    fn from(err: slatedb::Error) -> Self {
404        Self::Storage(err.into())
405    }
406}
407
408#[derive(Debug, Clone, thiserror::Error)]
409pub enum GetBasinConfigError {
410    #[error(transparent)]
411    Storage(#[from] StorageError),
412    #[error(transparent)]
413    BasinNotFound(#[from] BasinNotFoundError),
414}
415
416#[derive(Debug, Clone, thiserror::Error)]
417pub enum ReconfigureBasinError {
418    #[error(transparent)]
419    Storage(#[from] StorageError),
420    #[error(transparent)]
421    TransactionConflict(#[from] TransactionConflictError),
422    #[error(transparent)]
423    BasinNotFound(#[from] BasinNotFoundError),
424    #[error(transparent)]
425    BasinDeletionPending(#[from] BasinDeletionPendingError),
426}
427
428impl From<slatedb::Error> for ReconfigureBasinError {
429    fn from(err: slatedb::Error) -> Self {
430        if err.kind() == slatedb::ErrorKind::Transaction {
431            Self::TransactionConflict(TransactionConflictError)
432        } else {
433            Self::Storage(err.into())
434        }
435    }
436}
437
438#[derive(Debug, Clone, thiserror::Error)]
439pub enum ReconfigureStreamError {
440    #[error(transparent)]
441    Storage(#[from] StorageError),
442    #[error(transparent)]
443    TransactionConflict(#[from] TransactionConflictError),
444    #[error(transparent)]
445    BasinNotFound(#[from] BasinNotFoundError),
446    #[error(transparent)]
447    StreamNotFound(#[from] StreamNotFoundError),
448    #[error(transparent)]
449    StreamDeletionPending(#[from] StreamDeletionPendingError),
450    #[error(transparent)]
451    Validation(#[from] s2_common::types::ValidationError),
452}
453
454impl From<slatedb::Error> for ReconfigureStreamError {
455    fn from(err: slatedb::Error) -> Self {
456        if err.kind() == slatedb::ErrorKind::Transaction {
457            Self::TransactionConflict(TransactionConflictError)
458        } else {
459            Self::Storage(err.into())
460        }
461    }
462}
463
464#[derive(Debug, Clone, thiserror::Error)]
465pub enum DeleteBasinError {
466    #[error(transparent)]
467    Storage(#[from] StorageError),
468    #[error(transparent)]
469    BasinNotFound(#[from] BasinNotFoundError),
470}
471
472impl From<slatedb::Error> for DeleteBasinError {
473    fn from(err: slatedb::Error) -> Self {
474        Self::Storage(err.into())
475    }
476}