1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 encryption::EncryptionSpecResolutionError,
5 record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6 types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13 #[error("deserialization: {0}")]
14 Deserialization(#[from] kv::DeserializationError),
15 #[error("database: {0}")]
16 Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20 fn from(error: slatedb::Error) -> Self {
21 StorageError::Database(Arc::new(error))
22 }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28 pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34 pub basin: BasinName,
35 pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41 pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47 pub basin: BasinName,
48 pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54 pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60 pub basin: BasinName,
61 pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error(
82 "stream record limit exceeded: max assignable sequence number is {max_assignable_seq_num}; attempted {assigned_seq_num}"
83)]
84pub struct StreamRecordLimitExceededError {
85 pub first_seq_num: SeqNum,
86 pub assigned_seq_num: SeqNum,
87 pub max_assignable_seq_num: SeqNum,
88}
89
90#[derive(Debug, Clone, thiserror::Error)]
91#[error("transaction conflict occurred – this is usually retriable")]
92pub struct TransactionConflictError;
93
94#[derive(Debug, Clone, thiserror::Error)]
95pub enum StreamerError {
96 #[error(transparent)]
97 Storage(#[from] StorageError),
98 #[error(transparent)]
99 StreamNotFound(#[from] StreamNotFoundError),
100 #[error(transparent)]
101 StreamDeletionPending(#[from] StreamDeletionPendingError),
102}
103
104#[derive(Debug, Clone, thiserror::Error)]
105pub(super) enum AppendErrorInternal {
106 #[error(transparent)]
107 Storage(#[from] StorageError),
108 #[error(transparent)]
109 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
110 #[error(transparent)]
111 RequestDroppedError(#[from] RequestDroppedError),
112 #[error(transparent)]
113 ConditionFailed(#[from] AppendConditionFailedError),
114 #[error(transparent)]
115 TimestampMissing(#[from] AppendTimestampRequiredError),
116 #[error(transparent)]
117 StreamRecordLimitExceeded(#[from] StreamRecordLimitExceededError),
118}
119
120impl AppendErrorInternal {
121 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
122 match self {
123 Self::ConditionFailed(e) => e.durability_dependency(),
124 Self::StreamRecordLimitExceeded(e) => e.durability_dependency(),
125 _ => ..0,
126 }
127 }
128}
129
130#[derive(Debug, Clone, thiserror::Error)]
131pub enum CheckTailError {
132 #[error(transparent)]
133 Storage(#[from] StorageError),
134 #[error(transparent)]
135 TransactionConflict(#[from] TransactionConflictError),
136 #[error(transparent)]
137 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
138 #[error(transparent)]
139 BasinNotFound(#[from] BasinNotFoundError),
140 #[error(transparent)]
141 StreamNotFound(#[from] StreamNotFoundError),
142 #[error(transparent)]
143 BasinDeletionPending(#[from] BasinDeletionPendingError),
144 #[error(transparent)]
145 StreamDeletionPending(#[from] StreamDeletionPendingError),
146}
147
148impl From<StreamerError> for CheckTailError {
149 fn from(e: StreamerError) -> Self {
150 match e {
151 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
152 StreamerError::Storage(e) => Self::Storage(e),
153 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
154 }
155 }
156}
157
158#[derive(Debug, Clone, thiserror::Error)]
159pub enum AppendError {
160 #[error(transparent)]
161 Storage(#[from] StorageError),
162 #[error(transparent)]
163 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
164 #[error(transparent)]
165 TransactionConflict(#[from] TransactionConflictError),
166 #[error(transparent)]
167 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
168 #[error(transparent)]
169 RequestDroppedError(#[from] RequestDroppedError),
170 #[error(transparent)]
171 BasinNotFound(#[from] BasinNotFoundError),
172 #[error(transparent)]
173 StreamNotFound(#[from] StreamNotFoundError),
174 #[error(transparent)]
175 BasinDeletionPending(#[from] BasinDeletionPendingError),
176 #[error(transparent)]
177 StreamDeletionPending(#[from] StreamDeletionPendingError),
178 #[error(transparent)]
179 ConditionFailed(#[from] AppendConditionFailedError),
180 #[error(transparent)]
181 TimestampMissing(#[from] AppendTimestampRequiredError),
182 #[error(transparent)]
183 StreamRecordLimitExceeded(#[from] StreamRecordLimitExceededError),
184}
185
186impl From<AppendErrorInternal> for AppendError {
187 fn from(e: AppendErrorInternal) -> Self {
188 match e {
189 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
190 AppendErrorInternal::StreamerMissingInActionError(e) => {
191 AppendError::StreamerMissingInActionError(e)
192 }
193 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
194 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
195 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
196 AppendErrorInternal::StreamRecordLimitExceeded(e) => {
197 AppendError::StreamRecordLimitExceeded(e)
198 }
199 }
200 }
201}
202
203#[derive(Debug, Clone, thiserror::Error)]
204pub enum AppendConditionFailedError {
205 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
206 FencingTokenMismatch {
207 expected: FencingToken,
208 actual: FencingToken,
209 applied_point: RangeTo<SeqNum>,
210 },
211 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
212 SeqNumMismatch {
213 assigned_seq_num: SeqNum,
214 match_seq_num: SeqNum,
215 },
216}
217
218impl AppendConditionFailedError {
219 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
220 use AppendConditionFailedError::*;
221 match self {
222 SeqNumMismatch {
223 assigned_seq_num, ..
224 } => ..*assigned_seq_num,
225 FencingTokenMismatch { applied_point, .. } => *applied_point,
226 }
227 }
228}
229
230impl StreamRecordLimitExceededError {
231 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
232 ..self.first_seq_num
233 }
234}
235
236impl From<StreamerError> for AppendError {
237 fn from(e: StreamerError) -> Self {
238 match e {
239 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
240 StreamerError::Storage(e) => Self::Storage(e),
241 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
242 }
243 }
244}
245
246#[derive(Debug, Clone, thiserror::Error)]
247pub enum ReadError {
248 #[error(transparent)]
249 Storage(#[from] StorageError),
250 #[error(transparent)]
251 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
252 #[error(transparent)]
253 RecordDecryption(#[from] RecordDecryptionError),
254 #[error(transparent)]
255 TransactionConflict(#[from] TransactionConflictError),
256 #[error(transparent)]
257 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
258 #[error(transparent)]
259 BasinNotFound(#[from] BasinNotFoundError),
260 #[error(transparent)]
261 StreamNotFound(#[from] StreamNotFoundError),
262 #[error(transparent)]
263 BasinDeletionPending(#[from] BasinDeletionPendingError),
264 #[error(transparent)]
265 StreamDeletionPending(#[from] StreamDeletionPendingError),
266 #[error(transparent)]
267 Unwritten(#[from] UnwrittenError),
268}
269
270impl From<StreamerError> for ReadError {
271 fn from(e: StreamerError) -> Self {
272 match e {
273 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
274 StreamerError::Storage(e) => Self::Storage(e),
275 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
276 }
277 }
278}
279
280impl From<kv::DeserializationError> for ReadError {
281 fn from(e: kv::DeserializationError) -> Self {
282 Self::Storage(e.into())
283 }
284}
285
286impl From<slatedb::Error> for ReadError {
287 fn from(e: slatedb::Error) -> Self {
288 Self::Storage(e.into())
289 }
290}
291
292#[derive(Debug, Clone, thiserror::Error)]
293pub enum ListStreamsError {
294 #[error(transparent)]
295 Storage(#[from] StorageError),
296}
297
298impl From<slatedb::Error> for ListStreamsError {
299 fn from(e: slatedb::Error) -> Self {
300 Self::Storage(e.into())
301 }
302}
303
304impl From<kv::DeserializationError> for ListStreamsError {
305 fn from(e: kv::DeserializationError) -> Self {
306 Self::Storage(e.into())
307 }
308}
309
310#[derive(Debug, Clone, thiserror::Error)]
311pub enum CreateStreamError {
312 #[error(transparent)]
313 Storage(#[from] StorageError),
314 #[error(transparent)]
315 TransactionConflict(#[from] TransactionConflictError),
316 #[error(transparent)]
317 BasinNotFound(#[from] BasinNotFoundError),
318 #[error(transparent)]
319 BasinDeletionPending(#[from] BasinDeletionPendingError),
320 #[error(transparent)]
321 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
322 #[error(transparent)]
323 StreamDeletionPending(#[from] StreamDeletionPendingError),
324 #[error(transparent)]
325 Validation(#[from] s2_common::types::ValidationError),
326}
327
328impl From<slatedb::Error> for CreateStreamError {
329 fn from(err: slatedb::Error) -> Self {
330 if err.kind() == slatedb::ErrorKind::Transaction {
331 Self::TransactionConflict(TransactionConflictError)
332 } else {
333 Self::Storage(err.into())
334 }
335 }
336}
337
338impl From<GetBasinConfigError> for CreateStreamError {
339 fn from(err: GetBasinConfigError) -> Self {
340 match err {
341 GetBasinConfigError::Storage(e) => Self::Storage(e),
342 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
343 }
344 }
345}
346
347#[derive(Debug, Clone, thiserror::Error)]
348pub enum GetStreamConfigError {
349 #[error(transparent)]
350 Storage(#[from] StorageError),
351 #[error(transparent)]
352 StreamNotFound(#[from] StreamNotFoundError),
353 #[error(transparent)]
354 StreamDeletionPending(#[from] StreamDeletionPendingError),
355}
356
357#[derive(Debug, Clone, thiserror::Error)]
358pub enum DeleteStreamError {
359 #[error(transparent)]
360 Storage(#[from] StorageError),
361 #[error(transparent)]
362 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
363 #[error(transparent)]
364 RequestDroppedError(#[from] RequestDroppedError),
365 #[error(transparent)]
366 StreamNotFound(#[from] StreamNotFoundError),
367}
368
369impl From<slatedb::Error> for DeleteStreamError {
370 fn from(err: slatedb::Error) -> Self {
371 Self::Storage(err.into())
372 }
373}
374
375#[derive(Debug, Clone, thiserror::Error)]
376pub enum BasinDeletionError {
377 #[error(transparent)]
378 Storage(#[from] StorageError),
379 #[error(transparent)]
380 DeleteStream(#[from] DeleteStreamError),
381}
382
383#[derive(Debug, Clone, thiserror::Error)]
384pub enum StreamDeleteOnEmptyError {
385 #[error(transparent)]
386 Storage(#[from] StorageError),
387 #[error(transparent)]
388 DeleteStream(#[from] DeleteStreamError),
389}
390
391#[derive(Debug, Clone, thiserror::Error)]
392pub enum ListBasinsError {
393 #[error(transparent)]
394 Storage(#[from] StorageError),
395}
396
397impl From<slatedb::Error> for ListBasinsError {
398 fn from(err: slatedb::Error) -> Self {
399 Self::Storage(err.into())
400 }
401}
402
403impl From<kv::DeserializationError> for ListBasinsError {
404 fn from(e: kv::DeserializationError) -> Self {
405 Self::Storage(e.into())
406 }
407}
408
409#[derive(Debug, Clone, thiserror::Error)]
410pub enum CreateBasinError {
411 #[error(transparent)]
412 Storage(#[from] StorageError),
413 #[error(transparent)]
414 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
415 #[error(transparent)]
416 BasinDeletionPending(#[from] BasinDeletionPendingError),
417}
418
419impl From<slatedb::Error> for CreateBasinError {
420 fn from(err: slatedb::Error) -> Self {
421 Self::Storage(err.into())
422 }
423}
424
425#[derive(Debug, Clone, thiserror::Error)]
426pub enum GetBasinConfigError {
427 #[error(transparent)]
428 Storage(#[from] StorageError),
429 #[error(transparent)]
430 BasinNotFound(#[from] BasinNotFoundError),
431}
432
433#[derive(Debug, Clone, thiserror::Error)]
434pub enum ReconfigureBasinError {
435 #[error(transparent)]
436 Storage(#[from] StorageError),
437 #[error(transparent)]
438 TransactionConflict(#[from] TransactionConflictError),
439 #[error(transparent)]
440 BasinNotFound(#[from] BasinNotFoundError),
441 #[error(transparent)]
442 BasinDeletionPending(#[from] BasinDeletionPendingError),
443}
444
445impl From<slatedb::Error> for ReconfigureBasinError {
446 fn from(err: slatedb::Error) -> Self {
447 if err.kind() == slatedb::ErrorKind::Transaction {
448 Self::TransactionConflict(TransactionConflictError)
449 } else {
450 Self::Storage(err.into())
451 }
452 }
453}
454
455#[derive(Debug, Clone, thiserror::Error)]
456pub enum ReconfigureStreamError {
457 #[error(transparent)]
458 Storage(#[from] StorageError),
459 #[error(transparent)]
460 TransactionConflict(#[from] TransactionConflictError),
461 #[error(transparent)]
462 BasinNotFound(#[from] BasinNotFoundError),
463 #[error(transparent)]
464 StreamNotFound(#[from] StreamNotFoundError),
465 #[error(transparent)]
466 StreamDeletionPending(#[from] StreamDeletionPendingError),
467 #[error(transparent)]
468 Validation(#[from] s2_common::types::ValidationError),
469}
470
471impl From<slatedb::Error> for ReconfigureStreamError {
472 fn from(err: slatedb::Error) -> Self {
473 if err.kind() == slatedb::ErrorKind::Transaction {
474 Self::TransactionConflict(TransactionConflictError)
475 } else {
476 Self::Storage(err.into())
477 }
478 }
479}
480
481#[derive(Debug, Clone, thiserror::Error)]
482pub enum DeleteBasinError {
483 #[error(transparent)]
484 Storage(#[from] StorageError),
485 #[error(transparent)]
486 BasinNotFound(#[from] BasinNotFoundError),
487}
488
489impl From<slatedb::Error> for DeleteBasinError {
490 fn from(err: slatedb::Error) -> Self {
491 Self::Storage(err.into())
492 }
493}