1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 basin::BasinName,
5 encryption::EncryptionSpecResolutionError,
6 record::{FencingToken, SeqNum, StreamPosition},
7 stream::StreamName,
8};
9use s2_storage::record::RecordDecryptionError;
10
11use crate::backend::kv;
12
13#[derive(Debug, Clone, thiserror::Error)]
14pub enum StorageError {
15 #[error("deserialization: {0}")]
16 Deserialization(#[from] kv::DeserializationError),
17 #[error("database: {0}")]
18 Database(Arc<slatedb::Error>),
19}
20
21impl From<slatedb::Error> for StorageError {
22 fn from(error: slatedb::Error) -> Self {
23 StorageError::Database(Arc::new(error))
24 }
25}
26
27#[derive(Debug, Clone, thiserror::Error)]
28#[error("basin `{basin}` not found")]
29pub struct BasinNotFoundError {
30 pub basin: BasinName,
31}
32
33#[derive(Debug, Clone, thiserror::Error)]
34#[error("stream `{stream}` in basin `{basin}` not found")]
35pub struct StreamNotFoundError {
36 pub basin: BasinName,
37 pub stream: StreamName,
38}
39
40#[derive(Debug, Clone, thiserror::Error)]
41#[error("basin `{basin}` already exists")]
42pub struct BasinAlreadyExistsError {
43 pub basin: BasinName,
44}
45
46#[derive(Debug, Clone, thiserror::Error)]
47#[error("stream `{stream}` in basin `{basin}` already exists")]
48pub struct StreamAlreadyExistsError {
49 pub basin: BasinName,
50 pub stream: StreamName,
51}
52
53#[derive(Debug, Clone, thiserror::Error)]
54#[error("basin `{basin}` is being deleted")]
55pub struct BasinDeletionPendingError {
56 pub basin: BasinName,
57}
58
59#[derive(Debug, Clone, thiserror::Error)]
60#[error("stream deletion pending")]
61pub struct StreamDeletionPendingError;
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
81pub struct MaxSeqNumError {
82 pub first_seq_num: SeqNum,
83 pub assigned_seq_num: SeqNum,
84 pub max_assignable_seq_num: SeqNum,
85}
86
87#[derive(Debug, Clone, thiserror::Error)]
88#[error("transaction conflict occurred – this is usually retriable")]
89pub struct TransactionConflictError;
90
91#[derive(Debug, Clone, thiserror::Error)]
92pub enum StreamerError {
93 #[error(transparent)]
94 Storage(#[from] StorageError),
95 #[error(transparent)]
96 StreamNotFound(#[from] StreamNotFoundError),
97 #[error(transparent)]
98 StreamDeletionPending(#[from] StreamDeletionPendingError),
99}
100
101#[derive(Debug, Clone, thiserror::Error)]
102pub(super) enum AppendErrorInternal {
103 #[error(transparent)]
104 Storage(#[from] StorageError),
105 #[error(transparent)]
106 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
107 #[error(transparent)]
108 RequestDroppedError(#[from] RequestDroppedError),
109 #[error(transparent)]
110 StreamDeletionPending(#[from] StreamDeletionPendingError),
111 #[error(transparent)]
112 ConditionFailed(#[from] AppendConditionFailedError),
113 #[error(transparent)]
114 TimestampMissing(#[from] AppendTimestampRequiredError),
115 #[error(transparent)]
116 MaxSeqNum(#[from] MaxSeqNumError),
117}
118
119impl AppendErrorInternal {
120 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
121 match self {
122 Self::ConditionFailed(e) => e.durability_dependency(),
123 Self::MaxSeqNum(e) => e.durability_dependency(),
124 _ => ..0,
125 }
126 }
127}
128
129#[derive(Debug, Clone, thiserror::Error)]
130pub enum CheckTailError {
131 #[error(transparent)]
132 Storage(#[from] StorageError),
133 #[error(transparent)]
134 TransactionConflict(#[from] TransactionConflictError),
135 #[error(transparent)]
136 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
137 #[error(transparent)]
138 BasinNotFound(#[from] BasinNotFoundError),
139 #[error(transparent)]
140 StreamNotFound(#[from] StreamNotFoundError),
141 #[error(transparent)]
142 BasinDeletionPending(#[from] BasinDeletionPendingError),
143 #[error(transparent)]
144 StreamDeletionPending(#[from] StreamDeletionPendingError),
145}
146
147impl From<StreamerError> for CheckTailError {
148 fn from(e: StreamerError) -> Self {
149 match e {
150 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
151 StreamerError::Storage(e) => Self::Storage(e),
152 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
153 }
154 }
155}
156
157#[derive(Debug, Clone, thiserror::Error)]
158pub enum AppendError {
159 #[error(transparent)]
160 Storage(#[from] StorageError),
161 #[error(transparent)]
162 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
163 #[error(transparent)]
164 TransactionConflict(#[from] TransactionConflictError),
165 #[error(transparent)]
166 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
167 #[error(transparent)]
168 RequestDroppedError(#[from] RequestDroppedError),
169 #[error(transparent)]
170 BasinNotFound(#[from] BasinNotFoundError),
171 #[error(transparent)]
172 StreamNotFound(#[from] StreamNotFoundError),
173 #[error(transparent)]
174 BasinDeletionPending(#[from] BasinDeletionPendingError),
175 #[error(transparent)]
176 StreamDeletionPending(#[from] StreamDeletionPendingError),
177 #[error(transparent)]
178 ConditionFailed(#[from] AppendConditionFailedError),
179 #[error(transparent)]
180 TimestampMissing(#[from] AppendTimestampRequiredError),
181 #[error(transparent)]
182 MaxSeqNum(#[from] MaxSeqNumError),
183}
184
185impl From<AppendErrorInternal> for AppendError {
186 fn from(e: AppendErrorInternal) -> Self {
187 match e {
188 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
189 AppendErrorInternal::StreamerMissingInActionError(e) => {
190 AppendError::StreamerMissingInActionError(e)
191 }
192 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
193 AppendErrorInternal::StreamDeletionPending(e) => AppendError::StreamDeletionPending(e),
194 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
195 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
196 AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
197 }
198 }
199}
200
201#[derive(Debug, Clone, thiserror::Error)]
202pub enum AppendConditionFailedError {
203 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
204 FencingTokenMismatch {
205 expected: FencingToken,
206 actual: FencingToken,
207 applied_point: RangeTo<SeqNum>,
208 },
209 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
210 SeqNumMismatch {
211 assigned_seq_num: SeqNum,
212 match_seq_num: SeqNum,
213 },
214}
215
216impl AppendConditionFailedError {
217 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
218 use AppendConditionFailedError::*;
219 match self {
220 SeqNumMismatch {
221 assigned_seq_num, ..
222 } => ..*assigned_seq_num,
223 FencingTokenMismatch { applied_point, .. } => *applied_point,
224 }
225 }
226}
227
228impl MaxSeqNumError {
229 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
230 ..self.first_seq_num
231 }
232}
233
234impl From<StreamerError> for AppendError {
235 fn from(e: StreamerError) -> Self {
236 match e {
237 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
238 StreamerError::Storage(e) => Self::Storage(e),
239 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
240 }
241 }
242}
243
244#[derive(Debug, Clone, thiserror::Error)]
245pub enum ReadError {
246 #[error(transparent)]
247 Storage(#[from] StorageError),
248 #[error(transparent)]
249 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
250 #[error(transparent)]
251 RecordDecryption(#[from] RecordDecryptionError),
252 #[error(transparent)]
253 TransactionConflict(#[from] TransactionConflictError),
254 #[error(transparent)]
255 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
256 #[error(transparent)]
257 BasinNotFound(#[from] BasinNotFoundError),
258 #[error(transparent)]
259 StreamNotFound(#[from] StreamNotFoundError),
260 #[error(transparent)]
261 BasinDeletionPending(#[from] BasinDeletionPendingError),
262 #[error(transparent)]
263 StreamDeletionPending(#[from] StreamDeletionPendingError),
264 #[error(transparent)]
265 Unwritten(#[from] UnwrittenError),
266}
267
268impl From<StreamerError> for ReadError {
269 fn from(e: StreamerError) -> Self {
270 match e {
271 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
272 StreamerError::Storage(e) => Self::Storage(e),
273 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
274 }
275 }
276}
277
278impl From<kv::DeserializationError> for ReadError {
279 fn from(e: kv::DeserializationError) -> Self {
280 Self::Storage(e.into())
281 }
282}
283
284impl From<slatedb::Error> for ReadError {
285 fn from(e: slatedb::Error) -> Self {
286 Self::Storage(e.into())
287 }
288}
289
290#[derive(Debug, Clone, thiserror::Error)]
291pub enum ListStreamsError {
292 #[error(transparent)]
293 Storage(#[from] StorageError),
294}
295
296impl From<slatedb::Error> for ListStreamsError {
297 fn from(e: slatedb::Error) -> Self {
298 Self::Storage(e.into())
299 }
300}
301
302impl From<kv::DeserializationError> for ListStreamsError {
303 fn from(e: kv::DeserializationError) -> Self {
304 Self::Storage(e.into())
305 }
306}
307
308#[derive(Debug, Clone, thiserror::Error)]
309pub enum ProvisionStreamError {
310 #[error(transparent)]
311 Storage(#[from] StorageError),
312 #[error(transparent)]
313 TransactionConflict(#[from] TransactionConflictError),
314 #[error(transparent)]
315 BasinNotFound(#[from] BasinNotFoundError),
316 #[error(transparent)]
317 BasinDeletionPending(#[from] BasinDeletionPendingError),
318 #[error(transparent)]
319 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
320 #[error(transparent)]
321 StreamDeletionPending(#[from] StreamDeletionPendingError),
322 #[error(transparent)]
323 Validation(#[from] s2_common::ValidationError),
324}
325
326impl From<slatedb::Error> for ProvisionStreamError {
327 fn from(err: slatedb::Error) -> Self {
328 if err.kind() == slatedb::ErrorKind::Transaction {
329 Self::TransactionConflict(TransactionConflictError)
330 } else {
331 Self::Storage(err.into())
332 }
333 }
334}
335
336impl From<GetBasinConfigError> for ProvisionStreamError {
337 fn from(err: GetBasinConfigError) -> Self {
338 match err {
339 GetBasinConfigError::Storage(e) => Self::Storage(e),
340 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
341 }
342 }
343}
344
345#[derive(Debug, Clone, thiserror::Error)]
346pub enum GetStreamConfigError {
347 #[error(transparent)]
348 Storage(#[from] StorageError),
349 #[error(transparent)]
350 StreamNotFound(#[from] StreamNotFoundError),
351 #[error(transparent)]
352 StreamDeletionPending(#[from] StreamDeletionPendingError),
353}
354
355#[derive(Debug, Clone, thiserror::Error)]
356pub enum DeleteStreamError {
357 #[error(transparent)]
358 Storage(#[from] StorageError),
359 #[error(transparent)]
360 TransactionConflict(#[from] TransactionConflictError),
361 #[error(transparent)]
362 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
363 #[error(transparent)]
364 RequestDroppedError(#[from] RequestDroppedError),
365 #[error(transparent)]
366 StreamNotFound(#[from] StreamNotFoundError),
367}
368
369impl From<slatedb::Error> for DeleteStreamError {
370 fn from(err: slatedb::Error) -> Self {
371 if err.kind() == slatedb::ErrorKind::Transaction {
372 Self::TransactionConflict(TransactionConflictError)
373 } else {
374 Self::Storage(err.into())
375 }
376 }
377}
378
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum BasinDeletionError {
381 #[error(transparent)]
382 Storage(#[from] StorageError),
383 #[error(transparent)]
384 DeleteStream(#[from] DeleteStreamError),
385}
386
387#[derive(Debug, Clone, thiserror::Error)]
388pub enum StreamDeleteOnEmptyError {
389 #[error(transparent)]
390 Storage(#[from] StorageError),
391 #[error(transparent)]
392 DeleteStream(#[from] DeleteStreamError),
393}
394
395#[derive(Debug, Clone, thiserror::Error)]
396pub enum ListBasinsError {
397 #[error(transparent)]
398 Storage(#[from] StorageError),
399}
400
401impl From<slatedb::Error> for ListBasinsError {
402 fn from(err: slatedb::Error) -> Self {
403 Self::Storage(err.into())
404 }
405}
406
407impl From<kv::DeserializationError> for ListBasinsError {
408 fn from(e: kv::DeserializationError) -> Self {
409 Self::Storage(e.into())
410 }
411}
412
413#[derive(Debug, Clone, thiserror::Error)]
414pub enum ProvisionBasinError {
415 #[error(transparent)]
416 Storage(#[from] StorageError),
417 #[error(transparent)]
418 TransactionConflict(#[from] TransactionConflictError),
419 #[error(transparent)]
420 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
421 #[error(transparent)]
422 BasinDeletionPending(#[from] BasinDeletionPendingError),
423}
424
425impl From<slatedb::Error> for ProvisionBasinError {
426 fn from(err: slatedb::Error) -> Self {
427 if err.kind() == slatedb::ErrorKind::Transaction {
428 Self::TransactionConflict(TransactionConflictError)
429 } else {
430 Self::Storage(err.into())
431 }
432 }
433}
434
435#[derive(Debug, Clone, thiserror::Error)]
436pub enum GetBasinConfigError {
437 #[error(transparent)]
438 Storage(#[from] StorageError),
439 #[error(transparent)]
440 BasinNotFound(#[from] BasinNotFoundError),
441}
442
443#[derive(Debug, Clone, thiserror::Error)]
444pub enum ReconfigureBasinError {
445 #[error(transparent)]
446 Storage(#[from] StorageError),
447 #[error(transparent)]
448 TransactionConflict(#[from] TransactionConflictError),
449 #[error(transparent)]
450 BasinNotFound(#[from] BasinNotFoundError),
451 #[error(transparent)]
452 BasinDeletionPending(#[from] BasinDeletionPendingError),
453}
454
455impl From<slatedb::Error> for ReconfigureBasinError {
456 fn from(err: slatedb::Error) -> Self {
457 if err.kind() == slatedb::ErrorKind::Transaction {
458 Self::TransactionConflict(TransactionConflictError)
459 } else {
460 Self::Storage(err.into())
461 }
462 }
463}
464
465#[derive(Debug, Clone, thiserror::Error)]
466pub enum ReconfigureStreamError {
467 #[error(transparent)]
468 Storage(#[from] StorageError),
469 #[error(transparent)]
470 TransactionConflict(#[from] TransactionConflictError),
471 #[error(transparent)]
472 BasinNotFound(#[from] BasinNotFoundError),
473 #[error(transparent)]
474 BasinDeletionPending(#[from] BasinDeletionPendingError),
475 #[error(transparent)]
476 StreamNotFound(#[from] StreamNotFoundError),
477 #[error(transparent)]
478 StreamDeletionPending(#[from] StreamDeletionPendingError),
479 #[error(transparent)]
480 Validation(#[from] s2_common::ValidationError),
481}
482
483impl From<slatedb::Error> for ReconfigureStreamError {
484 fn from(err: slatedb::Error) -> Self {
485 if err.kind() == slatedb::ErrorKind::Transaction {
486 Self::TransactionConflict(TransactionConflictError)
487 } else {
488 Self::Storage(err.into())
489 }
490 }
491}
492
493#[derive(Debug, Clone, thiserror::Error)]
494pub enum DeleteBasinError {
495 #[error(transparent)]
496 Storage(#[from] StorageError),
497 #[error(transparent)]
498 TransactionConflict(#[from] TransactionConflictError),
499 #[error(transparent)]
500 BasinNotFound(#[from] BasinNotFoundError),
501}
502
503impl From<slatedb::Error> for DeleteBasinError {
504 fn from(err: slatedb::Error) -> Self {
505 if err.kind() == slatedb::ErrorKind::Transaction {
506 Self::TransactionConflict(TransactionConflictError)
507 } else {
508 Self::Storage(err.into())
509 }
510 }
511}