Skip to main content

s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    record::{FencingToken, SeqNum, StreamPosition},
5    types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12    #[error("deserialization: {0}")]
13    Deserialization(#[from] kv::DeserializationError),
14    #[error("database: {0}")]
15    Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19    fn from(error: slatedb::Error) -> Self {
20        StorageError::Database(Arc::new(error))
21    }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27    pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33    pub basin: BasinName,
34    pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40    pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46    pub basin: BasinName,
47    pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53    pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59    pub basin: BasinName,
60    pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85    #[error(transparent)]
86    Storage(#[from] StorageError),
87    #[error(transparent)]
88    StreamNotFound(#[from] StreamNotFoundError),
89    #[error(transparent)]
90    StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95    #[error(transparent)]
96    Storage(#[from] StorageError),
97    #[error(transparent)]
98    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
99    #[error(transparent)]
100    RequestDroppedError(#[from] RequestDroppedError),
101    #[error(transparent)]
102    ConditionFailed(#[from] AppendConditionFailedError),
103    #[error(transparent)]
104    TimestampMissing(#[from] AppendTimestampRequiredError),
105}
106
107#[derive(Debug, Clone, thiserror::Error)]
108pub enum CheckTailError {
109    #[error(transparent)]
110    Storage(#[from] StorageError),
111    #[error(transparent)]
112    TransactionConflict(#[from] TransactionConflictError),
113    #[error(transparent)]
114    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
115    #[error(transparent)]
116    BasinNotFound(#[from] BasinNotFoundError),
117    #[error(transparent)]
118    StreamNotFound(#[from] StreamNotFoundError),
119    #[error(transparent)]
120    BasinDeletionPending(#[from] BasinDeletionPendingError),
121    #[error(transparent)]
122    StreamDeletionPending(#[from] StreamDeletionPendingError),
123}
124
125impl From<StreamerError> for CheckTailError {
126    fn from(e: StreamerError) -> Self {
127        match e {
128            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
129            StreamerError::Storage(e) => Self::Storage(e),
130            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
131        }
132    }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum AppendError {
137    #[error(transparent)]
138    Storage(#[from] StorageError),
139    #[error(transparent)]
140    TransactionConflict(#[from] TransactionConflictError),
141    #[error(transparent)]
142    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
143    #[error(transparent)]
144    RequestDroppedError(#[from] RequestDroppedError),
145    #[error(transparent)]
146    BasinNotFound(#[from] BasinNotFoundError),
147    #[error(transparent)]
148    StreamNotFound(#[from] StreamNotFoundError),
149    #[error(transparent)]
150    BasinDeletionPending(#[from] BasinDeletionPendingError),
151    #[error(transparent)]
152    StreamDeletionPending(#[from] StreamDeletionPendingError),
153    #[error(transparent)]
154    ConditionFailed(#[from] AppendConditionFailedError),
155    #[error(transparent)]
156    TimestampMissing(#[from] AppendTimestampRequiredError),
157}
158
159impl From<AppendErrorInternal> for AppendError {
160    fn from(e: AppendErrorInternal) -> Self {
161        match e {
162            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
163            AppendErrorInternal::StreamerMissingInActionError(e) => {
164                AppendError::StreamerMissingInActionError(e)
165            }
166            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
167            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
168            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
169        }
170    }
171}
172
173#[derive(Debug, Clone, thiserror::Error)]
174pub enum AppendConditionFailedError {
175    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
176    FencingTokenMismatch {
177        expected: FencingToken,
178        actual: FencingToken,
179        applied_point: RangeTo<SeqNum>,
180    },
181    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
182    SeqNumMismatch {
183        assigned_seq_num: SeqNum,
184        match_seq_num: SeqNum,
185    },
186}
187
188impl AppendConditionFailedError {
189    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
190        use AppendConditionFailedError::*;
191        match self {
192            SeqNumMismatch {
193                assigned_seq_num, ..
194            } => ..*assigned_seq_num,
195            FencingTokenMismatch { applied_point, .. } => *applied_point,
196        }
197    }
198}
199
200impl From<StreamerError> for AppendError {
201    fn from(e: StreamerError) -> Self {
202        match e {
203            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
204            StreamerError::Storage(e) => Self::Storage(e),
205            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
206        }
207    }
208}
209
210#[derive(Debug, Clone, thiserror::Error)]
211pub enum ReadError {
212    #[error(transparent)]
213    Storage(#[from] StorageError),
214    #[error(transparent)]
215    TransactionConflict(#[from] TransactionConflictError),
216    #[error(transparent)]
217    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
218    #[error(transparent)]
219    BasinNotFound(#[from] BasinNotFoundError),
220    #[error(transparent)]
221    StreamNotFound(#[from] StreamNotFoundError),
222    #[error(transparent)]
223    BasinDeletionPending(#[from] BasinDeletionPendingError),
224    #[error(transparent)]
225    StreamDeletionPending(#[from] StreamDeletionPendingError),
226    #[error(transparent)]
227    Unwritten(#[from] UnwrittenError),
228}
229
230impl From<StreamerError> for ReadError {
231    fn from(e: StreamerError) -> Self {
232        match e {
233            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
234            StreamerError::Storage(e) => Self::Storage(e),
235            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
236        }
237    }
238}
239
240impl From<kv::DeserializationError> for ReadError {
241    fn from(e: kv::DeserializationError) -> Self {
242        Self::Storage(e.into())
243    }
244}
245
246impl From<slatedb::Error> for ReadError {
247    fn from(e: slatedb::Error) -> Self {
248        Self::Storage(e.into())
249    }
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum ListStreamsError {
254    #[error(transparent)]
255    Storage(#[from] StorageError),
256}
257
258impl From<slatedb::Error> for ListStreamsError {
259    fn from(e: slatedb::Error) -> Self {
260        Self::Storage(e.into())
261    }
262}
263
264impl From<kv::DeserializationError> for ListStreamsError {
265    fn from(e: kv::DeserializationError) -> Self {
266        Self::Storage(e.into())
267    }
268}
269
270#[derive(Debug, Clone, thiserror::Error)]
271pub enum CreateStreamError {
272    #[error(transparent)]
273    Storage(#[from] StorageError),
274    #[error(transparent)]
275    TransactionConflict(#[from] TransactionConflictError),
276    #[error(transparent)]
277    BasinNotFound(#[from] BasinNotFoundError),
278    #[error(transparent)]
279    BasinDeletionPending(#[from] BasinDeletionPendingError),
280    #[error(transparent)]
281    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
282    #[error(transparent)]
283    StreamDeletionPending(#[from] StreamDeletionPendingError),
284}
285
286impl From<slatedb::Error> for CreateStreamError {
287    fn from(err: slatedb::Error) -> Self {
288        if err.kind() == slatedb::ErrorKind::Transaction {
289            Self::TransactionConflict(TransactionConflictError)
290        } else {
291            Self::Storage(err.into())
292        }
293    }
294}
295
296impl From<GetBasinConfigError> for CreateStreamError {
297    fn from(err: GetBasinConfigError) -> Self {
298        match err {
299            GetBasinConfigError::Storage(e) => Self::Storage(e),
300            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
301        }
302    }
303}
304
305#[derive(Debug, Clone, thiserror::Error)]
306pub enum GetStreamConfigError {
307    #[error(transparent)]
308    Storage(#[from] StorageError),
309    #[error(transparent)]
310    StreamNotFound(#[from] StreamNotFoundError),
311}
312
313#[derive(Debug, Clone, thiserror::Error)]
314pub enum DeleteStreamError {
315    #[error(transparent)]
316    Storage(#[from] StorageError),
317    #[error(transparent)]
318    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
319    #[error(transparent)]
320    RequestDroppedError(#[from] RequestDroppedError),
321    #[error(transparent)]
322    StreamNotFound(#[from] StreamNotFoundError),
323}
324
325impl From<slatedb::Error> for DeleteStreamError {
326    fn from(err: slatedb::Error) -> Self {
327        Self::Storage(err.into())
328    }
329}
330
331#[derive(Debug, Clone, thiserror::Error)]
332pub enum BasinDeletionError {
333    #[error(transparent)]
334    Storage(#[from] StorageError),
335    #[error(transparent)]
336    DeleteStream(#[from] DeleteStreamError),
337}
338
339#[derive(Debug, Clone, thiserror::Error)]
340pub enum StreamDeleteOnEmptyError {
341    #[error(transparent)]
342    Storage(#[from] StorageError),
343    #[error(transparent)]
344    DeleteStream(#[from] DeleteStreamError),
345}
346
347#[derive(Debug, Clone, thiserror::Error)]
348pub enum ListBasinsError {
349    #[error(transparent)]
350    Storage(#[from] StorageError),
351}
352
353impl From<slatedb::Error> for ListBasinsError {
354    fn from(err: slatedb::Error) -> Self {
355        Self::Storage(err.into())
356    }
357}
358
359impl From<kv::DeserializationError> for ListBasinsError {
360    fn from(e: kv::DeserializationError) -> Self {
361        Self::Storage(e.into())
362    }
363}
364
365#[derive(Debug, Clone, thiserror::Error)]
366pub enum CreateBasinError {
367    #[error(transparent)]
368    Storage(#[from] StorageError),
369    #[error(transparent)]
370    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
371    #[error(transparent)]
372    BasinDeletionPending(#[from] BasinDeletionPendingError),
373}
374
375impl From<slatedb::Error> for CreateBasinError {
376    fn from(err: slatedb::Error) -> Self {
377        Self::Storage(err.into())
378    }
379}
380
381#[derive(Debug, Clone, thiserror::Error)]
382pub enum GetBasinConfigError {
383    #[error(transparent)]
384    Storage(#[from] StorageError),
385    #[error(transparent)]
386    BasinNotFound(#[from] BasinNotFoundError),
387}
388
389#[derive(Debug, Clone, thiserror::Error)]
390pub enum ReconfigureBasinError {
391    #[error(transparent)]
392    Storage(#[from] StorageError),
393    #[error(transparent)]
394    TransactionConflict(#[from] TransactionConflictError),
395    #[error(transparent)]
396    BasinNotFound(#[from] BasinNotFoundError),
397    #[error(transparent)]
398    BasinDeletionPending(#[from] BasinDeletionPendingError),
399}
400
401impl From<slatedb::Error> for ReconfigureBasinError {
402    fn from(err: slatedb::Error) -> Self {
403        if err.kind() == slatedb::ErrorKind::Transaction {
404            Self::TransactionConflict(TransactionConflictError)
405        } else {
406            Self::Storage(err.into())
407        }
408    }
409}
410
411#[derive(Debug, Clone, thiserror::Error)]
412pub enum ReconfigureStreamError {
413    #[error(transparent)]
414    Storage(#[from] StorageError),
415    #[error(transparent)]
416    TransactionConflict(#[from] TransactionConflictError),
417    #[error(transparent)]
418    StreamNotFound(#[from] StreamNotFoundError),
419    #[error(transparent)]
420    StreamDeletionPending(#[from] StreamDeletionPendingError),
421}
422
423impl From<slatedb::Error> for ReconfigureStreamError {
424    fn from(err: slatedb::Error) -> Self {
425        if err.kind() == slatedb::ErrorKind::Transaction {
426            Self::TransactionConflict(TransactionConflictError)
427        } else {
428            Self::Storage(err.into())
429        }
430    }
431}
432
433#[derive(Debug, Clone, thiserror::Error)]
434pub enum DeleteBasinError {
435    #[error(transparent)]
436    Storage(#[from] StorageError),
437    #[error(transparent)]
438    BasinNotFound(#[from] BasinNotFoundError),
439}
440
441impl From<slatedb::Error> for DeleteBasinError {
442    fn from(err: slatedb::Error) -> Self {
443        Self::Storage(err.into())
444    }
445}