1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 encryption::{EncryptionAlgorithm, EncryptionSpecResolutionError},
5 record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6 types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13 #[error("deserialization: {0}")]
14 Deserialization(#[from] kv::DeserializationError),
15 #[error("database: {0}")]
16 Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20 fn from(error: slatedb::Error) -> Self {
21 StorageError::Database(Arc::new(error))
22 }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28 pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34 pub basin: BasinName,
35 pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41 pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47 pub basin: BasinName,
48 pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54 pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60 pub basin: BasinName,
61 pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error("record encryption algorithm mismatch")]
82pub struct EncryptionAlgorithmMismatchError {
83 pub expected: Option<EncryptionAlgorithm>,
84 pub actual: Option<EncryptionAlgorithm>,
85}
86
87#[derive(Debug, Clone, thiserror::Error)]
88#[error("transaction conflict occurred – this is usually retriable")]
89pub struct TransactionConflictError;
90
91#[derive(Debug, Clone, thiserror::Error)]
92pub enum StreamerError {
93 #[error(transparent)]
94 Storage(#[from] StorageError),
95 #[error(transparent)]
96 StreamNotFound(#[from] StreamNotFoundError),
97 #[error(transparent)]
98 StreamDeletionPending(#[from] StreamDeletionPendingError),
99}
100
101#[derive(Debug, Clone, thiserror::Error)]
102pub(super) enum AppendErrorInternal {
103 #[error(transparent)]
104 Storage(#[from] StorageError),
105 #[error(transparent)]
106 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
107 #[error(transparent)]
108 RequestDroppedError(#[from] RequestDroppedError),
109 #[error(transparent)]
110 ConditionFailed(#[from] AppendConditionFailedError),
111 #[error(transparent)]
112 TimestampMissing(#[from] AppendTimestampRequiredError),
113 #[error(transparent)]
114 EncryptionAlgorithmMismatch(#[from] EncryptionAlgorithmMismatchError),
115}
116
117#[derive(Debug, Clone, thiserror::Error)]
118pub enum CheckTailError {
119 #[error(transparent)]
120 Storage(#[from] StorageError),
121 #[error(transparent)]
122 TransactionConflict(#[from] TransactionConflictError),
123 #[error(transparent)]
124 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
125 #[error(transparent)]
126 BasinNotFound(#[from] BasinNotFoundError),
127 #[error(transparent)]
128 StreamNotFound(#[from] StreamNotFoundError),
129 #[error(transparent)]
130 BasinDeletionPending(#[from] BasinDeletionPendingError),
131 #[error(transparent)]
132 StreamDeletionPending(#[from] StreamDeletionPendingError),
133}
134
135impl From<StreamerError> for CheckTailError {
136 fn from(e: StreamerError) -> Self {
137 match e {
138 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
139 StreamerError::Storage(e) => Self::Storage(e),
140 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
141 }
142 }
143}
144
145#[derive(Debug, Clone, thiserror::Error)]
146pub enum AppendError {
147 #[error(transparent)]
148 Storage(#[from] StorageError),
149 #[error(transparent)]
150 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
151 #[error(transparent)]
152 TransactionConflict(#[from] TransactionConflictError),
153 #[error(transparent)]
154 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
155 #[error(transparent)]
156 RequestDroppedError(#[from] RequestDroppedError),
157 #[error(transparent)]
158 BasinNotFound(#[from] BasinNotFoundError),
159 #[error(transparent)]
160 StreamNotFound(#[from] StreamNotFoundError),
161 #[error(transparent)]
162 BasinDeletionPending(#[from] BasinDeletionPendingError),
163 #[error(transparent)]
164 StreamDeletionPending(#[from] StreamDeletionPendingError),
165 #[error(transparent)]
166 ConditionFailed(#[from] AppendConditionFailedError),
167 #[error(transparent)]
168 TimestampMissing(#[from] AppendTimestampRequiredError),
169 #[error(transparent)]
170 EncryptionAlgorithmMismatch(#[from] EncryptionAlgorithmMismatchError),
171}
172
173impl From<AppendErrorInternal> for AppendError {
174 fn from(e: AppendErrorInternal) -> Self {
175 match e {
176 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
177 AppendErrorInternal::StreamerMissingInActionError(e) => {
178 AppendError::StreamerMissingInActionError(e)
179 }
180 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
181 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
182 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
183 AppendErrorInternal::EncryptionAlgorithmMismatch(e) => {
184 AppendError::EncryptionAlgorithmMismatch(e)
185 }
186 }
187 }
188}
189
190#[derive(Debug, Clone, thiserror::Error)]
191pub enum AppendConditionFailedError {
192 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
193 FencingTokenMismatch {
194 expected: FencingToken,
195 actual: FencingToken,
196 applied_point: RangeTo<SeqNum>,
197 },
198 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
199 SeqNumMismatch {
200 assigned_seq_num: SeqNum,
201 match_seq_num: SeqNum,
202 },
203}
204
205impl AppendConditionFailedError {
206 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
207 use AppendConditionFailedError::*;
208 match self {
209 SeqNumMismatch {
210 assigned_seq_num, ..
211 } => ..*assigned_seq_num,
212 FencingTokenMismatch { applied_point, .. } => *applied_point,
213 }
214 }
215}
216
217impl From<StreamerError> for AppendError {
218 fn from(e: StreamerError) -> Self {
219 match e {
220 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
221 StreamerError::Storage(e) => Self::Storage(e),
222 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
223 }
224 }
225}
226
227#[derive(Debug, Clone, thiserror::Error)]
228pub enum ReadError {
229 #[error(transparent)]
230 Storage(#[from] StorageError),
231 #[error(transparent)]
232 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
233 #[error(transparent)]
234 RecordDecryption(#[from] RecordDecryptionError),
235 #[error(transparent)]
236 TransactionConflict(#[from] TransactionConflictError),
237 #[error(transparent)]
238 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
239 #[error(transparent)]
240 BasinNotFound(#[from] BasinNotFoundError),
241 #[error(transparent)]
242 StreamNotFound(#[from] StreamNotFoundError),
243 #[error(transparent)]
244 BasinDeletionPending(#[from] BasinDeletionPendingError),
245 #[error(transparent)]
246 StreamDeletionPending(#[from] StreamDeletionPendingError),
247 #[error(transparent)]
248 Unwritten(#[from] UnwrittenError),
249}
250
251impl From<StreamerError> for ReadError {
252 fn from(e: StreamerError) -> Self {
253 match e {
254 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
255 StreamerError::Storage(e) => Self::Storage(e),
256 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
257 }
258 }
259}
260
261impl From<kv::DeserializationError> for ReadError {
262 fn from(e: kv::DeserializationError) -> Self {
263 Self::Storage(e.into())
264 }
265}
266
267impl From<slatedb::Error> for ReadError {
268 fn from(e: slatedb::Error) -> Self {
269 Self::Storage(e.into())
270 }
271}
272
273#[derive(Debug, Clone, thiserror::Error)]
274pub enum ListStreamsError {
275 #[error(transparent)]
276 Storage(#[from] StorageError),
277}
278
279impl From<slatedb::Error> for ListStreamsError {
280 fn from(e: slatedb::Error) -> Self {
281 Self::Storage(e.into())
282 }
283}
284
285impl From<kv::DeserializationError> for ListStreamsError {
286 fn from(e: kv::DeserializationError) -> Self {
287 Self::Storage(e.into())
288 }
289}
290
291#[derive(Debug, Clone, thiserror::Error)]
292pub enum CreateStreamError {
293 #[error(transparent)]
294 Storage(#[from] StorageError),
295 #[error(transparent)]
296 TransactionConflict(#[from] TransactionConflictError),
297 #[error(transparent)]
298 BasinNotFound(#[from] BasinNotFoundError),
299 #[error(transparent)]
300 BasinDeletionPending(#[from] BasinDeletionPendingError),
301 #[error(transparent)]
302 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
303 #[error(transparent)]
304 StreamDeletionPending(#[from] StreamDeletionPendingError),
305 #[error(transparent)]
306 Validation(#[from] s2_common::types::ValidationError),
307}
308
309impl From<slatedb::Error> for CreateStreamError {
310 fn from(err: slatedb::Error) -> Self {
311 if err.kind() == slatedb::ErrorKind::Transaction {
312 Self::TransactionConflict(TransactionConflictError)
313 } else {
314 Self::Storage(err.into())
315 }
316 }
317}
318
319impl From<GetBasinConfigError> for CreateStreamError {
320 fn from(err: GetBasinConfigError) -> Self {
321 match err {
322 GetBasinConfigError::Storage(e) => Self::Storage(e),
323 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
324 }
325 }
326}
327
328#[derive(Debug, Clone, thiserror::Error)]
329pub enum GetStreamConfigError {
330 #[error(transparent)]
331 Storage(#[from] StorageError),
332 #[error(transparent)]
333 StreamNotFound(#[from] StreamNotFoundError),
334 #[error(transparent)]
335 StreamDeletionPending(#[from] StreamDeletionPendingError),
336}
337
338#[derive(Debug, Clone, thiserror::Error)]
339pub enum DeleteStreamError {
340 #[error(transparent)]
341 Storage(#[from] StorageError),
342 #[error(transparent)]
343 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
344 #[error(transparent)]
345 RequestDroppedError(#[from] RequestDroppedError),
346 #[error(transparent)]
347 StreamNotFound(#[from] StreamNotFoundError),
348}
349
350impl From<slatedb::Error> for DeleteStreamError {
351 fn from(err: slatedb::Error) -> Self {
352 Self::Storage(err.into())
353 }
354}
355
356#[derive(Debug, Clone, thiserror::Error)]
357pub enum BasinDeletionError {
358 #[error(transparent)]
359 Storage(#[from] StorageError),
360 #[error(transparent)]
361 DeleteStream(#[from] DeleteStreamError),
362}
363
364#[derive(Debug, Clone, thiserror::Error)]
365pub enum StreamDeleteOnEmptyError {
366 #[error(transparent)]
367 Storage(#[from] StorageError),
368 #[error(transparent)]
369 DeleteStream(#[from] DeleteStreamError),
370}
371
372#[derive(Debug, Clone, thiserror::Error)]
373pub enum ListBasinsError {
374 #[error(transparent)]
375 Storage(#[from] StorageError),
376}
377
378impl From<slatedb::Error> for ListBasinsError {
379 fn from(err: slatedb::Error) -> Self {
380 Self::Storage(err.into())
381 }
382}
383
384impl From<kv::DeserializationError> for ListBasinsError {
385 fn from(e: kv::DeserializationError) -> Self {
386 Self::Storage(e.into())
387 }
388}
389
390#[derive(Debug, Clone, thiserror::Error)]
391pub enum CreateBasinError {
392 #[error(transparent)]
393 Storage(#[from] StorageError),
394 #[error(transparent)]
395 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
396 #[error(transparent)]
397 BasinDeletionPending(#[from] BasinDeletionPendingError),
398}
399
400impl From<slatedb::Error> for CreateBasinError {
401 fn from(err: slatedb::Error) -> Self {
402 Self::Storage(err.into())
403 }
404}
405
406#[derive(Debug, Clone, thiserror::Error)]
407pub enum GetBasinConfigError {
408 #[error(transparent)]
409 Storage(#[from] StorageError),
410 #[error(transparent)]
411 BasinNotFound(#[from] BasinNotFoundError),
412}
413
414#[derive(Debug, Clone, thiserror::Error)]
415pub enum ReconfigureBasinError {
416 #[error(transparent)]
417 Storage(#[from] StorageError),
418 #[error(transparent)]
419 TransactionConflict(#[from] TransactionConflictError),
420 #[error(transparent)]
421 BasinNotFound(#[from] BasinNotFoundError),
422 #[error(transparent)]
423 BasinDeletionPending(#[from] BasinDeletionPendingError),
424}
425
426impl From<slatedb::Error> for ReconfigureBasinError {
427 fn from(err: slatedb::Error) -> Self {
428 if err.kind() == slatedb::ErrorKind::Transaction {
429 Self::TransactionConflict(TransactionConflictError)
430 } else {
431 Self::Storage(err.into())
432 }
433 }
434}
435
436#[derive(Debug, Clone, thiserror::Error)]
437pub enum ReconfigureStreamError {
438 #[error(transparent)]
439 Storage(#[from] StorageError),
440 #[error(transparent)]
441 TransactionConflict(#[from] TransactionConflictError),
442 #[error(transparent)]
443 BasinNotFound(#[from] BasinNotFoundError),
444 #[error(transparent)]
445 StreamNotFound(#[from] StreamNotFoundError),
446 #[error(transparent)]
447 StreamDeletionPending(#[from] StreamDeletionPendingError),
448 #[error(transparent)]
449 Validation(#[from] s2_common::types::ValidationError),
450}
451
452impl From<slatedb::Error> for ReconfigureStreamError {
453 fn from(err: slatedb::Error) -> Self {
454 if err.kind() == slatedb::ErrorKind::Transaction {
455 Self::TransactionConflict(TransactionConflictError)
456 } else {
457 Self::Storage(err.into())
458 }
459 }
460}
461
462#[derive(Debug, Clone, thiserror::Error)]
463pub enum DeleteBasinError {
464 #[error(transparent)]
465 Storage(#[from] StorageError),
466 #[error(transparent)]
467 BasinNotFound(#[from] BasinNotFoundError),
468}
469
470impl From<slatedb::Error> for DeleteBasinError {
471 fn from(err: slatedb::Error) -> Self {
472 Self::Storage(err.into())
473 }
474}