1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 encryption::EncryptionSpecResolutionError,
5 record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6 types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13 #[error("deserialization: {0}")]
14 Deserialization(#[from] kv::DeserializationError),
15 #[error("database: {0}")]
16 Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20 fn from(error: slatedb::Error) -> Self {
21 StorageError::Database(Arc::new(error))
22 }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28 pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34 pub basin: BasinName,
35 pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41 pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47 pub basin: BasinName,
48 pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54 pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60 pub basin: BasinName,
61 pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error("max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}")]
82pub struct MaxSeqNumError {
83 pub first_seq_num: SeqNum,
84 pub assigned_seq_num: SeqNum,
85 pub max_assignable_seq_num: SeqNum,
86}
87
88#[derive(Debug, Clone, thiserror::Error)]
89#[error("transaction conflict occurred – this is usually retriable")]
90pub struct TransactionConflictError;
91
92#[derive(Debug, Clone, thiserror::Error)]
93pub enum StreamerError {
94 #[error(transparent)]
95 Storage(#[from] StorageError),
96 #[error(transparent)]
97 StreamNotFound(#[from] StreamNotFoundError),
98 #[error(transparent)]
99 StreamDeletionPending(#[from] StreamDeletionPendingError),
100}
101
102#[derive(Debug, Clone, thiserror::Error)]
103pub(super) enum AppendErrorInternal {
104 #[error(transparent)]
105 Storage(#[from] StorageError),
106 #[error(transparent)]
107 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
108 #[error(transparent)]
109 RequestDroppedError(#[from] RequestDroppedError),
110 #[error(transparent)]
111 ConditionFailed(#[from] AppendConditionFailedError),
112 #[error(transparent)]
113 TimestampMissing(#[from] AppendTimestampRequiredError),
114 #[error(transparent)]
115 MaxSeqNum(#[from] MaxSeqNumError),
116}
117
118impl AppendErrorInternal {
119 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
120 match self {
121 Self::ConditionFailed(e) => e.durability_dependency(),
122 Self::MaxSeqNum(e) => e.durability_dependency(),
123 _ => ..0,
124 }
125 }
126}
127
128#[derive(Debug, Clone, thiserror::Error)]
129pub enum CheckTailError {
130 #[error(transparent)]
131 Storage(#[from] StorageError),
132 #[error(transparent)]
133 TransactionConflict(#[from] TransactionConflictError),
134 #[error(transparent)]
135 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
136 #[error(transparent)]
137 BasinNotFound(#[from] BasinNotFoundError),
138 #[error(transparent)]
139 StreamNotFound(#[from] StreamNotFoundError),
140 #[error(transparent)]
141 BasinDeletionPending(#[from] BasinDeletionPendingError),
142 #[error(transparent)]
143 StreamDeletionPending(#[from] StreamDeletionPendingError),
144}
145
146impl From<StreamerError> for CheckTailError {
147 fn from(e: StreamerError) -> Self {
148 match e {
149 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
150 StreamerError::Storage(e) => Self::Storage(e),
151 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
152 }
153 }
154}
155
156#[derive(Debug, Clone, thiserror::Error)]
157pub enum AppendError {
158 #[error(transparent)]
159 Storage(#[from] StorageError),
160 #[error(transparent)]
161 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
162 #[error(transparent)]
163 TransactionConflict(#[from] TransactionConflictError),
164 #[error(transparent)]
165 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
166 #[error(transparent)]
167 RequestDroppedError(#[from] RequestDroppedError),
168 #[error(transparent)]
169 BasinNotFound(#[from] BasinNotFoundError),
170 #[error(transparent)]
171 StreamNotFound(#[from] StreamNotFoundError),
172 #[error(transparent)]
173 BasinDeletionPending(#[from] BasinDeletionPendingError),
174 #[error(transparent)]
175 StreamDeletionPending(#[from] StreamDeletionPendingError),
176 #[error(transparent)]
177 ConditionFailed(#[from] AppendConditionFailedError),
178 #[error(transparent)]
179 TimestampMissing(#[from] AppendTimestampRequiredError),
180 #[error(transparent)]
181 MaxSeqNum(#[from] MaxSeqNumError),
182}
183
184impl From<AppendErrorInternal> for AppendError {
185 fn from(e: AppendErrorInternal) -> Self {
186 match e {
187 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
188 AppendErrorInternal::StreamerMissingInActionError(e) => {
189 AppendError::StreamerMissingInActionError(e)
190 }
191 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
192 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
193 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
194 AppendErrorInternal::MaxSeqNum(e) => AppendError::MaxSeqNum(e),
195 }
196 }
197}
198
199#[derive(Debug, Clone, thiserror::Error)]
200pub enum AppendConditionFailedError {
201 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
202 FencingTokenMismatch {
203 expected: FencingToken,
204 actual: FencingToken,
205 applied_point: RangeTo<SeqNum>,
206 },
207 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
208 SeqNumMismatch {
209 assigned_seq_num: SeqNum,
210 match_seq_num: SeqNum,
211 },
212}
213
214impl AppendConditionFailedError {
215 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
216 use AppendConditionFailedError::*;
217 match self {
218 SeqNumMismatch {
219 assigned_seq_num, ..
220 } => ..*assigned_seq_num,
221 FencingTokenMismatch { applied_point, .. } => *applied_point,
222 }
223 }
224}
225
226impl MaxSeqNumError {
227 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
228 ..self.first_seq_num
229 }
230}
231
232impl From<StreamerError> for AppendError {
233 fn from(e: StreamerError) -> Self {
234 match e {
235 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
236 StreamerError::Storage(e) => Self::Storage(e),
237 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
238 }
239 }
240}
241
242#[derive(Debug, Clone, thiserror::Error)]
243pub enum ReadError {
244 #[error(transparent)]
245 Storage(#[from] StorageError),
246 #[error(transparent)]
247 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
248 #[error(transparent)]
249 RecordDecryption(#[from] RecordDecryptionError),
250 #[error(transparent)]
251 TransactionConflict(#[from] TransactionConflictError),
252 #[error(transparent)]
253 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
254 #[error(transparent)]
255 BasinNotFound(#[from] BasinNotFoundError),
256 #[error(transparent)]
257 StreamNotFound(#[from] StreamNotFoundError),
258 #[error(transparent)]
259 BasinDeletionPending(#[from] BasinDeletionPendingError),
260 #[error(transparent)]
261 StreamDeletionPending(#[from] StreamDeletionPendingError),
262 #[error(transparent)]
263 Unwritten(#[from] UnwrittenError),
264}
265
266impl From<StreamerError> for ReadError {
267 fn from(e: StreamerError) -> Self {
268 match e {
269 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
270 StreamerError::Storage(e) => Self::Storage(e),
271 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
272 }
273 }
274}
275
276impl From<kv::DeserializationError> for ReadError {
277 fn from(e: kv::DeserializationError) -> Self {
278 Self::Storage(e.into())
279 }
280}
281
282impl From<slatedb::Error> for ReadError {
283 fn from(e: slatedb::Error) -> Self {
284 Self::Storage(e.into())
285 }
286}
287
288#[derive(Debug, Clone, thiserror::Error)]
289pub enum ListStreamsError {
290 #[error(transparent)]
291 Storage(#[from] StorageError),
292}
293
294impl From<slatedb::Error> for ListStreamsError {
295 fn from(e: slatedb::Error) -> Self {
296 Self::Storage(e.into())
297 }
298}
299
300impl From<kv::DeserializationError> for ListStreamsError {
301 fn from(e: kv::DeserializationError) -> Self {
302 Self::Storage(e.into())
303 }
304}
305
306#[derive(Debug, Clone, thiserror::Error)]
307pub enum CreateStreamError {
308 #[error(transparent)]
309 Storage(#[from] StorageError),
310 #[error(transparent)]
311 TransactionConflict(#[from] TransactionConflictError),
312 #[error(transparent)]
313 BasinNotFound(#[from] BasinNotFoundError),
314 #[error(transparent)]
315 BasinDeletionPending(#[from] BasinDeletionPendingError),
316 #[error(transparent)]
317 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
318 #[error(transparent)]
319 StreamDeletionPending(#[from] StreamDeletionPendingError),
320 #[error(transparent)]
321 Validation(#[from] s2_common::types::ValidationError),
322}
323
324impl From<slatedb::Error> for CreateStreamError {
325 fn from(err: slatedb::Error) -> Self {
326 if err.kind() == slatedb::ErrorKind::Transaction {
327 Self::TransactionConflict(TransactionConflictError)
328 } else {
329 Self::Storage(err.into())
330 }
331 }
332}
333
334impl From<GetBasinConfigError> for CreateStreamError {
335 fn from(err: GetBasinConfigError) -> Self {
336 match err {
337 GetBasinConfigError::Storage(e) => Self::Storage(e),
338 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
339 }
340 }
341}
342
343#[derive(Debug, Clone, thiserror::Error)]
344pub enum GetStreamConfigError {
345 #[error(transparent)]
346 Storage(#[from] StorageError),
347 #[error(transparent)]
348 StreamNotFound(#[from] StreamNotFoundError),
349 #[error(transparent)]
350 StreamDeletionPending(#[from] StreamDeletionPendingError),
351}
352
353#[derive(Debug, Clone, thiserror::Error)]
354pub enum DeleteStreamError {
355 #[error(transparent)]
356 Storage(#[from] StorageError),
357 #[error(transparent)]
358 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
359 #[error(transparent)]
360 RequestDroppedError(#[from] RequestDroppedError),
361 #[error(transparent)]
362 StreamNotFound(#[from] StreamNotFoundError),
363}
364
365impl From<slatedb::Error> for DeleteStreamError {
366 fn from(err: slatedb::Error) -> Self {
367 Self::Storage(err.into())
368 }
369}
370
371#[derive(Debug, Clone, thiserror::Error)]
372pub enum BasinDeletionError {
373 #[error(transparent)]
374 Storage(#[from] StorageError),
375 #[error(transparent)]
376 DeleteStream(#[from] DeleteStreamError),
377}
378
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum StreamDeleteOnEmptyError {
381 #[error(transparent)]
382 Storage(#[from] StorageError),
383 #[error(transparent)]
384 DeleteStream(#[from] DeleteStreamError),
385}
386
387#[derive(Debug, Clone, thiserror::Error)]
388pub enum ListBasinsError {
389 #[error(transparent)]
390 Storage(#[from] StorageError),
391}
392
393impl From<slatedb::Error> for ListBasinsError {
394 fn from(err: slatedb::Error) -> Self {
395 Self::Storage(err.into())
396 }
397}
398
399impl From<kv::DeserializationError> for ListBasinsError {
400 fn from(e: kv::DeserializationError) -> Self {
401 Self::Storage(e.into())
402 }
403}
404
405#[derive(Debug, Clone, thiserror::Error)]
406pub enum CreateBasinError {
407 #[error(transparent)]
408 Storage(#[from] StorageError),
409 #[error(transparent)]
410 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
411 #[error(transparent)]
412 BasinDeletionPending(#[from] BasinDeletionPendingError),
413}
414
415impl From<slatedb::Error> for CreateBasinError {
416 fn from(err: slatedb::Error) -> Self {
417 Self::Storage(err.into())
418 }
419}
420
421#[derive(Debug, Clone, thiserror::Error)]
422pub enum GetBasinConfigError {
423 #[error(transparent)]
424 Storage(#[from] StorageError),
425 #[error(transparent)]
426 BasinNotFound(#[from] BasinNotFoundError),
427}
428
429#[derive(Debug, Clone, thiserror::Error)]
430pub enum ReconfigureBasinError {
431 #[error(transparent)]
432 Storage(#[from] StorageError),
433 #[error(transparent)]
434 TransactionConflict(#[from] TransactionConflictError),
435 #[error(transparent)]
436 BasinNotFound(#[from] BasinNotFoundError),
437 #[error(transparent)]
438 BasinDeletionPending(#[from] BasinDeletionPendingError),
439}
440
441impl From<slatedb::Error> for ReconfigureBasinError {
442 fn from(err: slatedb::Error) -> Self {
443 if err.kind() == slatedb::ErrorKind::Transaction {
444 Self::TransactionConflict(TransactionConflictError)
445 } else {
446 Self::Storage(err.into())
447 }
448 }
449}
450
451#[derive(Debug, Clone, thiserror::Error)]
452pub enum ReconfigureStreamError {
453 #[error(transparent)]
454 Storage(#[from] StorageError),
455 #[error(transparent)]
456 TransactionConflict(#[from] TransactionConflictError),
457 #[error(transparent)]
458 BasinNotFound(#[from] BasinNotFoundError),
459 #[error(transparent)]
460 StreamNotFound(#[from] StreamNotFoundError),
461 #[error(transparent)]
462 StreamDeletionPending(#[from] StreamDeletionPendingError),
463 #[error(transparent)]
464 Validation(#[from] s2_common::types::ValidationError),
465}
466
467impl From<slatedb::Error> for ReconfigureStreamError {
468 fn from(err: slatedb::Error) -> Self {
469 if err.kind() == slatedb::ErrorKind::Transaction {
470 Self::TransactionConflict(TransactionConflictError)
471 } else {
472 Self::Storage(err.into())
473 }
474 }
475}
476
477#[derive(Debug, Clone, thiserror::Error)]
478pub enum DeleteBasinError {
479 #[error(transparent)]
480 Storage(#[from] StorageError),
481 #[error(transparent)]
482 BasinNotFound(#[from] BasinNotFoundError),
483}
484
485impl From<slatedb::Error> for DeleteBasinError {
486 fn from(err: slatedb::Error) -> Self {
487 Self::Storage(err.into())
488 }
489}