1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 encryption::EncryptionSpecResolutionError,
5 record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6 types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13 #[error("deserialization: {0}")]
14 Deserialization(#[from] kv::DeserializationError),
15 #[error("database: {0}")]
16 Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20 fn from(error: slatedb::Error) -> Self {
21 StorageError::Database(Arc::new(error))
22 }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28 pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34 pub basin: BasinName,
35 pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41 pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47 pub basin: BasinName,
48 pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54 pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream deletion pending")]
59pub struct StreamDeletionPendingError;
60
61#[derive(Debug, Clone, thiserror::Error)]
62#[error("unwritten position: {0}")]
63pub struct UnwrittenError(pub StreamPosition);
64
65#[derive(Debug, Clone, thiserror::Error)]
66#[error("streamer missing in action")]
67pub struct StreamerMissingInActionError;
68
69#[derive(Debug, Clone, thiserror::Error)]
70#[error("request dropped")]
71pub struct RequestDroppedError;
72
73#[derive(Debug, Clone, thiserror::Error)]
74#[error("record timestamp was required but was missing")]
75pub struct AppendTimestampRequiredError;
76
77#[derive(Debug, Clone, thiserror::Error)]
78#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
79pub struct MaxSeqNumError {
80 pub first_seq_num: SeqNum,
81 pub assigned_seq_num: SeqNum,
82 pub max_assignable_seq_num: SeqNum,
83}
84
85#[derive(Debug, Clone, thiserror::Error)]
86#[error("transaction conflict occurred – this is usually retriable")]
87pub struct TransactionConflictError;
88
89#[derive(Debug, Clone, thiserror::Error)]
90pub enum StreamerError {
91 #[error(transparent)]
92 Storage(#[from] StorageError),
93 #[error(transparent)]
94 StreamNotFound(#[from] StreamNotFoundError),
95 #[error(transparent)]
96 StreamDeletionPending(#[from] StreamDeletionPendingError),
97}
98
99#[derive(Debug, Clone, thiserror::Error)]
100pub(super) enum AppendErrorInternal {
101 #[error(transparent)]
102 Storage(#[from] StorageError),
103 #[error(transparent)]
104 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
105 #[error(transparent)]
106 RequestDroppedError(#[from] RequestDroppedError),
107 #[error(transparent)]
108 StreamDeletionPending(#[from] StreamDeletionPendingError),
109 #[error(transparent)]
110 ConditionFailed(#[from] AppendConditionFailedError),
111 #[error(transparent)]
112 TimestampMissing(#[from] AppendTimestampRequiredError),
113 #[error(transparent)]
114 MaxSeqNum(#[from] MaxSeqNumError),
115}
116
117impl AppendErrorInternal {
118 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
119 match self {
120 Self::ConditionFailed(e) => e.durability_dependency(),
121 Self::MaxSeqNum(e) => e.durability_dependency(),
122 _ => ..0,
123 }
124 }
125}
126
127#[derive(Debug, Clone, thiserror::Error)]
128pub enum CheckTailError {
129 #[error(transparent)]
130 Storage(#[from] StorageError),
131 #[error(transparent)]
132 TransactionConflict(#[from] TransactionConflictError),
133 #[error(transparent)]
134 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
135 #[error(transparent)]
136 BasinNotFound(#[from] BasinNotFoundError),
137 #[error(transparent)]
138 StreamNotFound(#[from] StreamNotFoundError),
139 #[error(transparent)]
140 BasinDeletionPending(#[from] BasinDeletionPendingError),
141 #[error(transparent)]
142 StreamDeletionPending(#[from] StreamDeletionPendingError),
143}
144
145impl From<StreamerError> for CheckTailError {
146 fn from(e: StreamerError) -> Self {
147 match e {
148 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
149 StreamerError::Storage(e) => Self::Storage(e),
150 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
151 }
152 }
153}
154
155#[derive(Debug, Clone, thiserror::Error)]
156pub enum AppendError {
157 #[error(transparent)]
158 Storage(#[from] StorageError),
159 #[error(transparent)]
160 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
161 #[error(transparent)]
162 TransactionConflict(#[from] TransactionConflictError),
163 #[error(transparent)]
164 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
165 #[error(transparent)]
166 RequestDroppedError(#[from] RequestDroppedError),
167 #[error(transparent)]
168 BasinNotFound(#[from] BasinNotFoundError),
169 #[error(transparent)]
170 StreamNotFound(#[from] StreamNotFoundError),
171 #[error(transparent)]
172 BasinDeletionPending(#[from] BasinDeletionPendingError),
173 #[error(transparent)]
174 StreamDeletionPending(#[from] StreamDeletionPendingError),
175 #[error(transparent)]
176 ConditionFailed(#[from] AppendConditionFailedError),
177 #[error(transparent)]
178 TimestampMissing(#[from] AppendTimestampRequiredError),
179 #[error(transparent)]
180 MaxSeqNum(#[from] MaxSeqNumError),
181}
182
183impl From<AppendErrorInternal> for AppendError {
184 fn from(e: AppendErrorInternal) -> Self {
185 match e {
186 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
187 AppendErrorInternal::StreamerMissingInActionError(e) => {
188 AppendError::StreamerMissingInActionError(e)
189 }
190 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
191 AppendErrorInternal::StreamDeletionPending(e) => AppendError::StreamDeletionPending(e),
192 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
193 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
194 AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
195 }
196 }
197}
198
199#[derive(Debug, Clone, thiserror::Error)]
200pub enum AppendConditionFailedError {
201 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
202 FencingTokenMismatch {
203 expected: FencingToken,
204 actual: FencingToken,
205 applied_point: RangeTo<SeqNum>,
206 },
207 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
208 SeqNumMismatch {
209 assigned_seq_num: SeqNum,
210 match_seq_num: SeqNum,
211 },
212}
213
214impl AppendConditionFailedError {
215 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
216 use AppendConditionFailedError::*;
217 match self {
218 SeqNumMismatch {
219 assigned_seq_num, ..
220 } => ..*assigned_seq_num,
221 FencingTokenMismatch { applied_point, .. } => *applied_point,
222 }
223 }
224}
225
226impl MaxSeqNumError {
227 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
228 ..self.first_seq_num
229 }
230}
231
232impl From<StreamerError> for AppendError {
233 fn from(e: StreamerError) -> Self {
234 match e {
235 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
236 StreamerError::Storage(e) => Self::Storage(e),
237 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
238 }
239 }
240}
241
242#[derive(Debug, Clone, thiserror::Error)]
243pub enum ReadError {
244 #[error(transparent)]
245 Storage(#[from] StorageError),
246 #[error(transparent)]
247 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
248 #[error(transparent)]
249 RecordDecryption(#[from] RecordDecryptionError),
250 #[error(transparent)]
251 TransactionConflict(#[from] TransactionConflictError),
252 #[error(transparent)]
253 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
254 #[error(transparent)]
255 BasinNotFound(#[from] BasinNotFoundError),
256 #[error(transparent)]
257 StreamNotFound(#[from] StreamNotFoundError),
258 #[error(transparent)]
259 BasinDeletionPending(#[from] BasinDeletionPendingError),
260 #[error(transparent)]
261 StreamDeletionPending(#[from] StreamDeletionPendingError),
262 #[error(transparent)]
263 Unwritten(#[from] UnwrittenError),
264}
265
266impl From<StreamerError> for ReadError {
267 fn from(e: StreamerError) -> Self {
268 match e {
269 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
270 StreamerError::Storage(e) => Self::Storage(e),
271 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
272 }
273 }
274}
275
276impl From<kv::DeserializationError> for ReadError {
277 fn from(e: kv::DeserializationError) -> Self {
278 Self::Storage(e.into())
279 }
280}
281
282impl From<slatedb::Error> for ReadError {
283 fn from(e: slatedb::Error) -> Self {
284 Self::Storage(e.into())
285 }
286}
287
288#[derive(Debug, Clone, thiserror::Error)]
289pub enum ListStreamsError {
290 #[error(transparent)]
291 Storage(#[from] StorageError),
292}
293
294impl From<slatedb::Error> for ListStreamsError {
295 fn from(e: slatedb::Error) -> Self {
296 Self::Storage(e.into())
297 }
298}
299
300impl From<kv::DeserializationError> for ListStreamsError {
301 fn from(e: kv::DeserializationError) -> Self {
302 Self::Storage(e.into())
303 }
304}
305
306#[derive(Debug, Clone, thiserror::Error)]
307pub enum ProvisionStreamError {
308 #[error(transparent)]
309 Storage(#[from] StorageError),
310 #[error(transparent)]
311 TransactionConflict(#[from] TransactionConflictError),
312 #[error(transparent)]
313 BasinNotFound(#[from] BasinNotFoundError),
314 #[error(transparent)]
315 BasinDeletionPending(#[from] BasinDeletionPendingError),
316 #[error(transparent)]
317 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
318 #[error(transparent)]
319 StreamDeletionPending(#[from] StreamDeletionPendingError),
320 #[error(transparent)]
321 Validation(#[from] s2_common::types::ValidationError),
322}
323
324impl From<slatedb::Error> for ProvisionStreamError {
325 fn from(err: slatedb::Error) -> Self {
326 if err.kind() == slatedb::ErrorKind::Transaction {
327 Self::TransactionConflict(TransactionConflictError)
328 } else {
329 Self::Storage(err.into())
330 }
331 }
332}
333
334impl From<GetBasinConfigError> for ProvisionStreamError {
335 fn from(err: GetBasinConfigError) -> Self {
336 match err {
337 GetBasinConfigError::Storage(e) => Self::Storage(e),
338 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
339 }
340 }
341}
342
343#[derive(Debug, Clone, thiserror::Error)]
344pub enum GetStreamConfigError {
345 #[error(transparent)]
346 Storage(#[from] StorageError),
347 #[error(transparent)]
348 StreamNotFound(#[from] StreamNotFoundError),
349 #[error(transparent)]
350 StreamDeletionPending(#[from] StreamDeletionPendingError),
351}
352
353#[derive(Debug, Clone, thiserror::Error)]
354pub enum DeleteStreamError {
355 #[error(transparent)]
356 Storage(#[from] StorageError),
357 #[error(transparent)]
358 TransactionConflict(#[from] TransactionConflictError),
359 #[error(transparent)]
360 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
361 #[error(transparent)]
362 RequestDroppedError(#[from] RequestDroppedError),
363 #[error(transparent)]
364 StreamNotFound(#[from] StreamNotFoundError),
365}
366
367impl From<slatedb::Error> for DeleteStreamError {
368 fn from(err: slatedb::Error) -> Self {
369 if err.kind() == slatedb::ErrorKind::Transaction {
370 Self::TransactionConflict(TransactionConflictError)
371 } else {
372 Self::Storage(err.into())
373 }
374 }
375}
376
377#[derive(Debug, Clone, thiserror::Error)]
378pub enum BasinDeletionError {
379 #[error(transparent)]
380 Storage(#[from] StorageError),
381 #[error(transparent)]
382 DeleteStream(#[from] DeleteStreamError),
383}
384
385#[derive(Debug, Clone, thiserror::Error)]
386pub enum StreamDeleteOnEmptyError {
387 #[error(transparent)]
388 Storage(#[from] StorageError),
389 #[error(transparent)]
390 DeleteStream(#[from] DeleteStreamError),
391}
392
393#[derive(Debug, Clone, thiserror::Error)]
394pub enum ListBasinsError {
395 #[error(transparent)]
396 Storage(#[from] StorageError),
397}
398
399impl From<slatedb::Error> for ListBasinsError {
400 fn from(err: slatedb::Error) -> Self {
401 Self::Storage(err.into())
402 }
403}
404
405impl From<kv::DeserializationError> for ListBasinsError {
406 fn from(e: kv::DeserializationError) -> Self {
407 Self::Storage(e.into())
408 }
409}
410
411#[derive(Debug, Clone, thiserror::Error)]
412pub enum ProvisionBasinError {
413 #[error(transparent)]
414 Storage(#[from] StorageError),
415 #[error(transparent)]
416 TransactionConflict(#[from] TransactionConflictError),
417 #[error(transparent)]
418 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
419 #[error(transparent)]
420 BasinDeletionPending(#[from] BasinDeletionPendingError),
421}
422
423impl From<slatedb::Error> for ProvisionBasinError {
424 fn from(err: slatedb::Error) -> Self {
425 if err.kind() == slatedb::ErrorKind::Transaction {
426 Self::TransactionConflict(TransactionConflictError)
427 } else {
428 Self::Storage(err.into())
429 }
430 }
431}
432
433#[derive(Debug, Clone, thiserror::Error)]
434pub enum GetBasinConfigError {
435 #[error(transparent)]
436 Storage(#[from] StorageError),
437 #[error(transparent)]
438 BasinNotFound(#[from] BasinNotFoundError),
439}
440
441#[derive(Debug, Clone, thiserror::Error)]
442pub enum ReconfigureBasinError {
443 #[error(transparent)]
444 Storage(#[from] StorageError),
445 #[error(transparent)]
446 TransactionConflict(#[from] TransactionConflictError),
447 #[error(transparent)]
448 BasinNotFound(#[from] BasinNotFoundError),
449 #[error(transparent)]
450 BasinDeletionPending(#[from] BasinDeletionPendingError),
451}
452
453impl From<slatedb::Error> for ReconfigureBasinError {
454 fn from(err: slatedb::Error) -> Self {
455 if err.kind() == slatedb::ErrorKind::Transaction {
456 Self::TransactionConflict(TransactionConflictError)
457 } else {
458 Self::Storage(err.into())
459 }
460 }
461}
462
463#[derive(Debug, Clone, thiserror::Error)]
464pub enum ReconfigureStreamError {
465 #[error(transparent)]
466 Storage(#[from] StorageError),
467 #[error(transparent)]
468 TransactionConflict(#[from] TransactionConflictError),
469 #[error(transparent)]
470 BasinNotFound(#[from] BasinNotFoundError),
471 #[error(transparent)]
472 BasinDeletionPending(#[from] BasinDeletionPendingError),
473 #[error(transparent)]
474 StreamNotFound(#[from] StreamNotFoundError),
475 #[error(transparent)]
476 StreamDeletionPending(#[from] StreamDeletionPendingError),
477 #[error(transparent)]
478 Validation(#[from] s2_common::types::ValidationError),
479}
480
481impl From<slatedb::Error> for ReconfigureStreamError {
482 fn from(err: slatedb::Error) -> Self {
483 if err.kind() == slatedb::ErrorKind::Transaction {
484 Self::TransactionConflict(TransactionConflictError)
485 } else {
486 Self::Storage(err.into())
487 }
488 }
489}
490
491#[derive(Debug, Clone, thiserror::Error)]
492pub enum DeleteBasinError {
493 #[error(transparent)]
494 Storage(#[from] StorageError),
495 #[error(transparent)]
496 TransactionConflict(#[from] TransactionConflictError),
497 #[error(transparent)]
498 BasinNotFound(#[from] BasinNotFoundError),
499}
500
501impl From<slatedb::Error> for DeleteBasinError {
502 fn from(err: slatedb::Error) -> Self {
503 if err.kind() == slatedb::ErrorKind::Transaction {
504 Self::TransactionConflict(TransactionConflictError)
505 } else {
506 Self::Storage(err.into())
507 }
508 }
509}