Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    encryption::{EncryptionAlgorithm, EncryptionSpecResolutionError},
5    record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6    types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13    #[error("deserialization: {0}")]
14    Deserialization(#[from] kv::DeserializationError),
15    #[error("database: {0}")]
16    Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20    fn from(error: slatedb::Error) -> Self {
21        StorageError::Database(Arc::new(error))
22    }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28    pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34    pub basin: BasinName,
35    pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41    pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47    pub basin: BasinName,
48    pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54    pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60    pub basin: BasinName,
61    pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error("record encryption algorithm mismatch")]
82pub struct EncryptionAlgorithmMismatchError {
83    pub expected: Option<EncryptionAlgorithm>,
84    pub actual: Option<EncryptionAlgorithm>,
85}
86
87#[derive(Debug, Clone, thiserror::Error)]
88#[error("transaction conflict occurred – this is usually retriable")]
89pub struct TransactionConflictError;
90
91#[derive(Debug, Clone, thiserror::Error)]
92pub enum StreamerError {
93    #[error(transparent)]
94    Storage(#[from] StorageError),
95    #[error(transparent)]
96    StreamNotFound(#[from] StreamNotFoundError),
97    #[error(transparent)]
98    StreamDeletionPending(#[from] StreamDeletionPendingError),
99}
100
101#[derive(Debug, Clone, thiserror::Error)]
102pub(super) enum AppendErrorInternal {
103    #[error(transparent)]
104    Storage(#[from] StorageError),
105    #[error(transparent)]
106    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
107    #[error(transparent)]
108    RequestDroppedError(#[from] RequestDroppedError),
109    #[error(transparent)]
110    ConditionFailed(#[from] AppendConditionFailedError),
111    #[error(transparent)]
112    TimestampMissing(#[from] AppendTimestampRequiredError),
113    #[error(transparent)]
114    EncryptionAlgorithmMismatch(#[from] EncryptionAlgorithmMismatchError),
115}
116
117#[derive(Debug, Clone, thiserror::Error)]
118pub enum CheckTailError {
119    #[error(transparent)]
120    Storage(#[from] StorageError),
121    #[error(transparent)]
122    TransactionConflict(#[from] TransactionConflictError),
123    #[error(transparent)]
124    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
125    #[error(transparent)]
126    BasinNotFound(#[from] BasinNotFoundError),
127    #[error(transparent)]
128    StreamNotFound(#[from] StreamNotFoundError),
129    #[error(transparent)]
130    BasinDeletionPending(#[from] BasinDeletionPendingError),
131    #[error(transparent)]
132    StreamDeletionPending(#[from] StreamDeletionPendingError),
133}
134
135impl From<StreamerError> for CheckTailError {
136    fn from(e: StreamerError) -> Self {
137        match e {
138            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
139            StreamerError::Storage(e) => Self::Storage(e),
140            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
141        }
142    }
143}
144
145#[derive(Debug, Clone, thiserror::Error)]
146pub enum AppendError {
147    #[error(transparent)]
148    Storage(#[from] StorageError),
149    #[error(transparent)]
150    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
151    #[error(transparent)]
152    TransactionConflict(#[from] TransactionConflictError),
153    #[error(transparent)]
154    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
155    #[error(transparent)]
156    RequestDroppedError(#[from] RequestDroppedError),
157    #[error(transparent)]
158    BasinNotFound(#[from] BasinNotFoundError),
159    #[error(transparent)]
160    StreamNotFound(#[from] StreamNotFoundError),
161    #[error(transparent)]
162    BasinDeletionPending(#[from] BasinDeletionPendingError),
163    #[error(transparent)]
164    StreamDeletionPending(#[from] StreamDeletionPendingError),
165    #[error(transparent)]
166    ConditionFailed(#[from] AppendConditionFailedError),
167    #[error(transparent)]
168    TimestampMissing(#[from] AppendTimestampRequiredError),
169    #[error(transparent)]
170    EncryptionAlgorithmMismatch(#[from] EncryptionAlgorithmMismatchError),
171}
172
173impl From<AppendErrorInternal> for AppendError {
174    fn from(e: AppendErrorInternal) -> Self {
175        match e {
176            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
177            AppendErrorInternal::StreamerMissingInActionError(e) => {
178                AppendError::StreamerMissingInActionError(e)
179            }
180            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
181            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
182            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
183            AppendErrorInternal::EncryptionAlgorithmMismatch(e) => {
184                AppendError::EncryptionAlgorithmMismatch(e)
185            }
186        }
187    }
188}
189
190#[derive(Debug, Clone, thiserror::Error)]
191pub enum AppendConditionFailedError {
192    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
193    FencingTokenMismatch {
194        expected: FencingToken,
195        actual: FencingToken,
196        applied_point: RangeTo<SeqNum>,
197    },
198    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
199    SeqNumMismatch {
200        assigned_seq_num: SeqNum,
201        match_seq_num: SeqNum,
202    },
203}
204
205impl AppendConditionFailedError {
206    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
207        use AppendConditionFailedError::*;
208        match self {
209            SeqNumMismatch {
210                assigned_seq_num, ..
211            } => ..*assigned_seq_num,
212            FencingTokenMismatch { applied_point, .. } => *applied_point,
213        }
214    }
215}
216
217impl From<StreamerError> for AppendError {
218    fn from(e: StreamerError) -> Self {
219        match e {
220            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
221            StreamerError::Storage(e) => Self::Storage(e),
222            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
223        }
224    }
225}
226
227#[derive(Debug, Clone, thiserror::Error)]
228pub enum ReadError {
229    #[error(transparent)]
230    Storage(#[from] StorageError),
231    #[error(transparent)]
232    EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
233    #[error(transparent)]
234    RecordDecryption(#[from] RecordDecryptionError),
235    #[error(transparent)]
236    TransactionConflict(#[from] TransactionConflictError),
237    #[error(transparent)]
238    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
239    #[error(transparent)]
240    BasinNotFound(#[from] BasinNotFoundError),
241    #[error(transparent)]
242    StreamNotFound(#[from] StreamNotFoundError),
243    #[error(transparent)]
244    BasinDeletionPending(#[from] BasinDeletionPendingError),
245    #[error(transparent)]
246    StreamDeletionPending(#[from] StreamDeletionPendingError),
247    #[error(transparent)]
248    Unwritten(#[from] UnwrittenError),
249}
250
251impl From<StreamerError> for ReadError {
252    fn from(e: StreamerError) -> Self {
253        match e {
254            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
255            StreamerError::Storage(e) => Self::Storage(e),
256            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
257        }
258    }
259}
260
261impl From<kv::DeserializationError> for ReadError {
262    fn from(e: kv::DeserializationError) -> Self {
263        Self::Storage(e.into())
264    }
265}
266
267impl From<slatedb::Error> for ReadError {
268    fn from(e: slatedb::Error) -> Self {
269        Self::Storage(e.into())
270    }
271}
272
273#[derive(Debug, Clone, thiserror::Error)]
274pub enum ListStreamsError {
275    #[error(transparent)]
276    Storage(#[from] StorageError),
277}
278
279impl From<slatedb::Error> for ListStreamsError {
280    fn from(e: slatedb::Error) -> Self {
281        Self::Storage(e.into())
282    }
283}
284
285impl From<kv::DeserializationError> for ListStreamsError {
286    fn from(e: kv::DeserializationError) -> Self {
287        Self::Storage(e.into())
288    }
289}
290
291#[derive(Debug, Clone, thiserror::Error)]
292pub enum CreateStreamError {
293    #[error(transparent)]
294    Storage(#[from] StorageError),
295    #[error(transparent)]
296    TransactionConflict(#[from] TransactionConflictError),
297    #[error(transparent)]
298    BasinNotFound(#[from] BasinNotFoundError),
299    #[error(transparent)]
300    BasinDeletionPending(#[from] BasinDeletionPendingError),
301    #[error(transparent)]
302    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
303    #[error(transparent)]
304    StreamDeletionPending(#[from] StreamDeletionPendingError),
305    #[error(transparent)]
306    Validation(#[from] s2_common::types::ValidationError),
307}
308
309impl From<slatedb::Error> for CreateStreamError {
310    fn from(err: slatedb::Error) -> Self {
311        if err.kind() == slatedb::ErrorKind::Transaction {
312            Self::TransactionConflict(TransactionConflictError)
313        } else {
314            Self::Storage(err.into())
315        }
316    }
317}
318
319impl From<GetBasinConfigError> for CreateStreamError {
320    fn from(err: GetBasinConfigError) -> Self {
321        match err {
322            GetBasinConfigError::Storage(e) => Self::Storage(e),
323            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
324        }
325    }
326}
327
328#[derive(Debug, Clone, thiserror::Error)]
329pub enum GetStreamConfigError {
330    #[error(transparent)]
331    Storage(#[from] StorageError),
332    #[error(transparent)]
333    StreamNotFound(#[from] StreamNotFoundError),
334    #[error(transparent)]
335    StreamDeletionPending(#[from] StreamDeletionPendingError),
336}
337
338#[derive(Debug, Clone, thiserror::Error)]
339pub enum DeleteStreamError {
340    #[error(transparent)]
341    Storage(#[from] StorageError),
342    #[error(transparent)]
343    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
344    #[error(transparent)]
345    RequestDroppedError(#[from] RequestDroppedError),
346    #[error(transparent)]
347    StreamNotFound(#[from] StreamNotFoundError),
348}
349
350impl From<slatedb::Error> for DeleteStreamError {
351    fn from(err: slatedb::Error) -> Self {
352        Self::Storage(err.into())
353    }
354}
355
356#[derive(Debug, Clone, thiserror::Error)]
357pub enum BasinDeletionError {
358    #[error(transparent)]
359    Storage(#[from] StorageError),
360    #[error(transparent)]
361    DeleteStream(#[from] DeleteStreamError),
362}
363
364#[derive(Debug, Clone, thiserror::Error)]
365pub enum StreamDeleteOnEmptyError {
366    #[error(transparent)]
367    Storage(#[from] StorageError),
368    #[error(transparent)]
369    DeleteStream(#[from] DeleteStreamError),
370}
371
372#[derive(Debug, Clone, thiserror::Error)]
373pub enum ListBasinsError {
374    #[error(transparent)]
375    Storage(#[from] StorageError),
376}
377
378impl From<slatedb::Error> for ListBasinsError {
379    fn from(err: slatedb::Error) -> Self {
380        Self::Storage(err.into())
381    }
382}
383
384impl From<kv::DeserializationError> for ListBasinsError {
385    fn from(e: kv::DeserializationError) -> Self {
386        Self::Storage(e.into())
387    }
388}
389
390#[derive(Debug, Clone, thiserror::Error)]
391pub enum CreateBasinError {
392    #[error(transparent)]
393    Storage(#[from] StorageError),
394    #[error(transparent)]
395    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
396    #[error(transparent)]
397    BasinDeletionPending(#[from] BasinDeletionPendingError),
398}
399
400impl From<slatedb::Error> for CreateBasinError {
401    fn from(err: slatedb::Error) -> Self {
402        Self::Storage(err.into())
403    }
404}
405
406#[derive(Debug, Clone, thiserror::Error)]
407pub enum GetBasinConfigError {
408    #[error(transparent)]
409    Storage(#[from] StorageError),
410    #[error(transparent)]
411    BasinNotFound(#[from] BasinNotFoundError),
412}
413
414#[derive(Debug, Clone, thiserror::Error)]
415pub enum ReconfigureBasinError {
416    #[error(transparent)]
417    Storage(#[from] StorageError),
418    #[error(transparent)]
419    TransactionConflict(#[from] TransactionConflictError),
420    #[error(transparent)]
421    BasinNotFound(#[from] BasinNotFoundError),
422    #[error(transparent)]
423    BasinDeletionPending(#[from] BasinDeletionPendingError),
424}
425
426impl From<slatedb::Error> for ReconfigureBasinError {
427    fn from(err: slatedb::Error) -> Self {
428        if err.kind() == slatedb::ErrorKind::Transaction {
429            Self::TransactionConflict(TransactionConflictError)
430        } else {
431            Self::Storage(err.into())
432        }
433    }
434}
435
436#[derive(Debug, Clone, thiserror::Error)]
437pub enum ReconfigureStreamError {
438    #[error(transparent)]
439    Storage(#[from] StorageError),
440    #[error(transparent)]
441    TransactionConflict(#[from] TransactionConflictError),
442    #[error(transparent)]
443    BasinNotFound(#[from] BasinNotFoundError),
444    #[error(transparent)]
445    StreamNotFound(#[from] StreamNotFoundError),
446    #[error(transparent)]
447    StreamDeletionPending(#[from] StreamDeletionPendingError),
448    #[error(transparent)]
449    Validation(#[from] s2_common::types::ValidationError),
450}
451
452impl From<slatedb::Error> for ReconfigureStreamError {
453    fn from(err: slatedb::Error) -> Self {
454        if err.kind() == slatedb::ErrorKind::Transaction {
455            Self::TransactionConflict(TransactionConflictError)
456        } else {
457            Self::Storage(err.into())
458        }
459    }
460}
461
462#[derive(Debug, Clone, thiserror::Error)]
463pub enum DeleteBasinError {
464    #[error(transparent)]
465    Storage(#[from] StorageError),
466    #[error(transparent)]
467    BasinNotFound(#[from] BasinNotFoundError),
468}
469
470impl From<slatedb::Error> for DeleteBasinError {
471    fn from(err: slatedb::Error) -> Self {
472        Self::Storage(err.into())
473    }
474}