s2_lite/backend/
error.rs

1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4    record::{FencingToken, SeqNum, StreamPosition},
5    types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12    #[error("deserialization: {0}")]
13    Deserialization(#[from] kv::DeserializationError),
14    #[error("database: {0}")]
15    Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19    fn from(error: slatedb::Error) -> Self {
20        StorageError::Database(Arc::new(error))
21    }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27    pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33    pub basin: BasinName,
34    pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40    pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46    pub basin: BasinName,
47    pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53    pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59    pub basin: BasinName,
60    pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85    #[error(transparent)]
86    Storage(#[from] StorageError),
87    #[error(transparent)]
88    StreamNotFound(#[from] StreamNotFoundError),
89    #[error(transparent)]
90    StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95    #[error(transparent)]
96    Storage(#[from] StorageError),
97    #[error(transparent)]
98    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
99    #[error(transparent)]
100    RequestDroppedError(#[from] RequestDroppedError),
101    #[error(transparent)]
102    ConditionFailed(#[from] AppendConditionFailedError),
103    #[error(transparent)]
104    TimestampMissing(#[from] AppendTimestampRequiredError),
105}
106
107#[derive(Debug, Clone, thiserror::Error)]
108pub enum CheckTailError {
109    #[error(transparent)]
110    Storage(#[from] StorageError),
111    #[error(transparent)]
112    TransactionConflict(#[from] TransactionConflictError),
113    #[error(transparent)]
114    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
115    #[error(transparent)]
116    BasinNotFound(#[from] BasinNotFoundError),
117    #[error(transparent)]
118    StreamNotFound(#[from] StreamNotFoundError),
119    #[error(transparent)]
120    BasinDeletionPending(#[from] BasinDeletionPendingError),
121    #[error(transparent)]
122    StreamDeletionPending(#[from] StreamDeletionPendingError),
123}
124
125impl From<StreamerError> for CheckTailError {
126    fn from(e: StreamerError) -> Self {
127        match e {
128            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
129            StreamerError::Storage(e) => Self::Storage(e),
130            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
131        }
132    }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum AppendError {
137    #[error(transparent)]
138    Storage(#[from] StorageError),
139    #[error(transparent)]
140    TransactionConflict(#[from] TransactionConflictError),
141    #[error(transparent)]
142    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
143    #[error(transparent)]
144    RequestDroppedError(#[from] RequestDroppedError),
145    #[error(transparent)]
146    BasinNotFound(#[from] BasinNotFoundError),
147    #[error(transparent)]
148    StreamNotFound(#[from] StreamNotFoundError),
149    #[error(transparent)]
150    BasinDeletionPending(#[from] BasinDeletionPendingError),
151    #[error(transparent)]
152    StreamDeletionPending(#[from] StreamDeletionPendingError),
153    #[error(transparent)]
154    ConditionFailed(#[from] AppendConditionFailedError),
155    #[error(transparent)]
156    TimestampMissing(#[from] AppendTimestampRequiredError),
157}
158
159impl From<AppendErrorInternal> for AppendError {
160    fn from(e: AppendErrorInternal) -> Self {
161        match e {
162            AppendErrorInternal::Storage(e) => AppendError::Storage(e),
163            AppendErrorInternal::StreamerMissingInActionError(e) => {
164                AppendError::StreamerMissingInActionError(e)
165            }
166            AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
167            AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
168            AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
169        }
170    }
171}
172
173#[derive(Debug, Clone, thiserror::Error)]
174pub enum AppendConditionFailedError {
175    #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
176    FencingTokenMismatch {
177        expected: FencingToken,
178        actual: FencingToken,
179        applied_point: RangeTo<SeqNum>,
180    },
181    #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
182    SeqNumMismatch {
183        assigned_seq_num: SeqNum,
184        match_seq_num: SeqNum,
185    },
186}
187
188impl AppendConditionFailedError {
189    pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
190        use AppendConditionFailedError::*;
191        match self {
192            SeqNumMismatch {
193                assigned_seq_num, ..
194            } => ..*assigned_seq_num,
195            FencingTokenMismatch { applied_point, .. } => *applied_point,
196        }
197    }
198}
199
200impl From<StreamerError> for AppendError {
201    fn from(e: StreamerError) -> Self {
202        match e {
203            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
204            StreamerError::Storage(e) => Self::Storage(e),
205            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
206        }
207    }
208}
209
210#[derive(Debug, Clone, thiserror::Error)]
211pub enum ReadError {
212    #[error(transparent)]
213    Storage(#[from] StorageError),
214    #[error(transparent)]
215    TransactionConflict(#[from] TransactionConflictError),
216    #[error(transparent)]
217    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
218    #[error(transparent)]
219    BasinNotFound(#[from] BasinNotFoundError),
220    #[error(transparent)]
221    StreamNotFound(#[from] StreamNotFoundError),
222    #[error(transparent)]
223    BasinDeletionPending(#[from] BasinDeletionPendingError),
224    #[error(transparent)]
225    StreamDeletionPending(#[from] StreamDeletionPendingError),
226    #[error(transparent)]
227    Unwritten(#[from] UnwrittenError),
228}
229
230impl From<StreamerError> for ReadError {
231    fn from(e: StreamerError) -> Self {
232        match e {
233            StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
234            StreamerError::Storage(e) => Self::Storage(e),
235            StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
236        }
237    }
238}
239
240impl From<kv::DeserializationError> for ReadError {
241    fn from(e: kv::DeserializationError) -> Self {
242        Self::Storage(e.into())
243    }
244}
245
246impl From<slatedb::Error> for ReadError {
247    fn from(e: slatedb::Error) -> Self {
248        Self::Storage(e.into())
249    }
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum ListStreamsError {
254    #[error(transparent)]
255    Storage(#[from] StorageError),
256}
257
258impl From<slatedb::Error> for ListStreamsError {
259    fn from(e: slatedb::Error) -> Self {
260        Self::Storage(e.into())
261    }
262}
263
264impl From<kv::DeserializationError> for ListStreamsError {
265    fn from(e: kv::DeserializationError) -> Self {
266        Self::Storage(e.into())
267    }
268}
269
270#[derive(Debug, Clone, thiserror::Error)]
271pub enum CreateStreamError {
272    #[error(transparent)]
273    Storage(#[from] StorageError),
274    #[error(transparent)]
275    TransactionConflict(#[from] TransactionConflictError),
276    #[error(transparent)]
277    BasinNotFound(#[from] BasinNotFoundError),
278    #[error(transparent)]
279    BasinDeletionPending(#[from] BasinDeletionPendingError),
280    #[error(transparent)]
281    StreamAlreadyExists(#[from] StreamAlreadyExistsError),
282    #[error(transparent)]
283    StreamDeletionPending(#[from] StreamDeletionPendingError),
284}
285
286impl From<slatedb::Error> for CreateStreamError {
287    fn from(err: slatedb::Error) -> Self {
288        if err.kind() == slatedb::ErrorKind::Transaction {
289            Self::TransactionConflict(TransactionConflictError)
290        } else {
291            Self::Storage(err.into())
292        }
293    }
294}
295
296impl From<GetBasinConfigError> for CreateStreamError {
297    fn from(err: GetBasinConfigError) -> Self {
298        match err {
299            GetBasinConfigError::Storage(e) => Self::Storage(e),
300            GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
301        }
302    }
303}
304
305#[derive(Debug, Clone, thiserror::Error)]
306pub enum GetStreamConfigError {
307    #[error(transparent)]
308    Storage(#[from] StorageError),
309    #[error(transparent)]
310    StreamNotFound(#[from] StreamNotFoundError),
311}
312
313#[derive(Debug, Clone, thiserror::Error)]
314pub enum DeleteStreamError {
315    #[error(transparent)]
316    Storage(#[from] StorageError),
317    #[error(transparent)]
318    StreamerMissingInActionError(#[from] StreamerMissingInActionError),
319    #[error(transparent)]
320    RequestDroppedError(#[from] RequestDroppedError),
321    #[error(transparent)]
322    StreamNotFound(#[from] StreamNotFoundError),
323}
324
325impl From<slatedb::Error> for DeleteStreamError {
326    fn from(err: slatedb::Error) -> Self {
327        Self::Storage(err.into())
328    }
329}
330
331#[derive(Debug, Clone, thiserror::Error)]
332pub enum ListBasinsError {
333    #[error(transparent)]
334    Storage(#[from] StorageError),
335}
336
337impl From<slatedb::Error> for ListBasinsError {
338    fn from(err: slatedb::Error) -> Self {
339        Self::Storage(err.into())
340    }
341}
342
343impl From<kv::DeserializationError> for ListBasinsError {
344    fn from(e: kv::DeserializationError) -> Self {
345        Self::Storage(e.into())
346    }
347}
348
349#[derive(Debug, Clone, thiserror::Error)]
350pub enum CreateBasinError {
351    #[error(transparent)]
352    Storage(#[from] StorageError),
353    #[error(transparent)]
354    BasinAlreadyExists(#[from] BasinAlreadyExistsError),
355    #[error(transparent)]
356    BasinDeletionPending(#[from] BasinDeletionPendingError),
357}
358
359impl From<slatedb::Error> for CreateBasinError {
360    fn from(err: slatedb::Error) -> Self {
361        Self::Storage(err.into())
362    }
363}
364
365#[derive(Debug, Clone, thiserror::Error)]
366pub enum GetBasinConfigError {
367    #[error(transparent)]
368    Storage(#[from] StorageError),
369    #[error(transparent)]
370    BasinNotFound(#[from] BasinNotFoundError),
371}
372
373#[derive(Debug, Clone, thiserror::Error)]
374pub enum ReconfigureBasinError {
375    #[error(transparent)]
376    Storage(#[from] StorageError),
377    #[error(transparent)]
378    TransactionConflict(#[from] TransactionConflictError),
379    #[error(transparent)]
380    BasinNotFound(#[from] BasinNotFoundError),
381    #[error(transparent)]
382    BasinDeletionPending(#[from] BasinDeletionPendingError),
383}
384
385impl From<slatedb::Error> for ReconfigureBasinError {
386    fn from(err: slatedb::Error) -> Self {
387        if err.kind() == slatedb::ErrorKind::Transaction {
388            Self::TransactionConflict(TransactionConflictError)
389        } else {
390            Self::Storage(err.into())
391        }
392    }
393}
394
395#[derive(Debug, Clone, thiserror::Error)]
396pub enum ReconfigureStreamError {
397    #[error(transparent)]
398    Storage(#[from] StorageError),
399    #[error(transparent)]
400    TransactionConflict(#[from] TransactionConflictError),
401    #[error(transparent)]
402    StreamNotFound(#[from] StreamNotFoundError),
403    #[error(transparent)]
404    StreamDeletionPending(#[from] StreamDeletionPendingError),
405}
406
407impl From<slatedb::Error> for ReconfigureStreamError {
408    fn from(err: slatedb::Error) -> Self {
409        if err.kind() == slatedb::ErrorKind::Transaction {
410            Self::TransactionConflict(TransactionConflictError)
411        } else {
412            Self::Storage(err.into())
413        }
414    }
415}
416
417#[derive(Debug, Clone, thiserror::Error)]
418pub enum DeleteBasinError {
419    #[error(transparent)]
420    Storage(#[from] StorageError),
421    #[error(transparent)]
422    BasinNotFound(#[from] BasinNotFoundError),
423}
424
425impl From<slatedb::Error> for DeleteBasinError {
426    fn from(err: slatedb::Error) -> Self {
427        Self::Storage(err.into())
428    }
429}