1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 encryption::EncryptionSpecResolutionError,
5 record::{FencingToken, RecordDecryptionError, SeqNum, StreamPosition},
6 types::{basin::BasinName, stream::StreamName},
7};
8
9use crate::backend::kv;
10
11#[derive(Debug, Clone, thiserror::Error)]
12pub enum StorageError {
13 #[error("deserialization: {0}")]
14 Deserialization(#[from] kv::DeserializationError),
15 #[error("database: {0}")]
16 Database(Arc<slatedb::Error>),
17}
18
19impl From<slatedb::Error> for StorageError {
20 fn from(error: slatedb::Error) -> Self {
21 StorageError::Database(Arc::new(error))
22 }
23}
24
25#[derive(Debug, Clone, thiserror::Error)]
26#[error("basin `{basin}` not found")]
27pub struct BasinNotFoundError {
28 pub basin: BasinName,
29}
30
31#[derive(Debug, Clone, thiserror::Error)]
32#[error("stream `{stream}` in basin `{basin}` not found")]
33pub struct StreamNotFoundError {
34 pub basin: BasinName,
35 pub stream: StreamName,
36}
37
38#[derive(Debug, Clone, thiserror::Error)]
39#[error("basin `{basin}` already exists")]
40pub struct BasinAlreadyExistsError {
41 pub basin: BasinName,
42}
43
44#[derive(Debug, Clone, thiserror::Error)]
45#[error("stream `{stream}` in basin `{basin}` already exists")]
46pub struct StreamAlreadyExistsError {
47 pub basin: BasinName,
48 pub stream: StreamName,
49}
50
51#[derive(Debug, Clone, thiserror::Error)]
52#[error("basin `{basin}` is being deleted")]
53pub struct BasinDeletionPendingError {
54 pub basin: BasinName,
55}
56
57#[derive(Debug, Clone, thiserror::Error)]
58#[error("stream `{stream}` in basin `{basin}` is being deleted")]
59pub struct StreamDeletionPendingError {
60 pub basin: BasinName,
61 pub stream: StreamName,
62}
63
64#[derive(Debug, Clone, thiserror::Error)]
65#[error("unwritten position: {0}")]
66pub struct UnwrittenError(pub StreamPosition);
67
68#[derive(Debug, Clone, thiserror::Error)]
69#[error("streamer missing in action")]
70pub struct StreamerMissingInActionError;
71
72#[derive(Debug, Clone, thiserror::Error)]
73#[error("request dropped")]
74pub struct RequestDroppedError;
75
76#[derive(Debug, Clone, thiserror::Error)]
77#[error("record timestamp was required but was missing")]
78pub struct AppendTimestampRequiredError;
79
80#[derive(Debug, Clone, thiserror::Error)]
81#[error(
82 "stream encrypted record limit exceeded: records must be assigned sequence numbers below {limit}; attempted {assigned_seq_num}"
83)]
84pub struct StreamEncryptedRecordLimitExceededError {
85 pub assigned_seq_num: SeqNum,
86 pub limit: SeqNum,
87}
88
89#[derive(Debug, Clone, thiserror::Error)]
90#[error("transaction conflict occurred – this is usually retriable")]
91pub struct TransactionConflictError;
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub enum StreamerError {
95 #[error(transparent)]
96 Storage(#[from] StorageError),
97 #[error(transparent)]
98 StreamNotFound(#[from] StreamNotFoundError),
99 #[error(transparent)]
100 StreamDeletionPending(#[from] StreamDeletionPendingError),
101}
102
103#[derive(Debug, Clone, thiserror::Error)]
104pub(super) enum AppendErrorInternal {
105 #[error(transparent)]
106 Storage(#[from] StorageError),
107 #[error(transparent)]
108 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
109 #[error(transparent)]
110 RequestDroppedError(#[from] RequestDroppedError),
111 #[error(transparent)]
112 ConditionFailed(#[from] AppendConditionFailedError),
113 #[error(transparent)]
114 TimestampMissing(#[from] AppendTimestampRequiredError),
115 #[error(transparent)]
116 StreamEncryptedRecordLimitExceeded(#[from] StreamEncryptedRecordLimitExceededError),
117}
118
119#[derive(Debug, Clone, thiserror::Error)]
120pub enum CheckTailError {
121 #[error(transparent)]
122 Storage(#[from] StorageError),
123 #[error(transparent)]
124 TransactionConflict(#[from] TransactionConflictError),
125 #[error(transparent)]
126 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
127 #[error(transparent)]
128 BasinNotFound(#[from] BasinNotFoundError),
129 #[error(transparent)]
130 StreamNotFound(#[from] StreamNotFoundError),
131 #[error(transparent)]
132 BasinDeletionPending(#[from] BasinDeletionPendingError),
133 #[error(transparent)]
134 StreamDeletionPending(#[from] StreamDeletionPendingError),
135}
136
137impl From<StreamerError> for CheckTailError {
138 fn from(e: StreamerError) -> Self {
139 match e {
140 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
141 StreamerError::Storage(e) => Self::Storage(e),
142 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
143 }
144 }
145}
146
147#[derive(Debug, Clone, thiserror::Error)]
148pub enum AppendError {
149 #[error(transparent)]
150 Storage(#[from] StorageError),
151 #[error(transparent)]
152 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
153 #[error(transparent)]
154 TransactionConflict(#[from] TransactionConflictError),
155 #[error(transparent)]
156 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
157 #[error(transparent)]
158 RequestDroppedError(#[from] RequestDroppedError),
159 #[error(transparent)]
160 BasinNotFound(#[from] BasinNotFoundError),
161 #[error(transparent)]
162 StreamNotFound(#[from] StreamNotFoundError),
163 #[error(transparent)]
164 BasinDeletionPending(#[from] BasinDeletionPendingError),
165 #[error(transparent)]
166 StreamDeletionPending(#[from] StreamDeletionPendingError),
167 #[error(transparent)]
168 ConditionFailed(#[from] AppendConditionFailedError),
169 #[error(transparent)]
170 TimestampMissing(#[from] AppendTimestampRequiredError),
171 #[error(transparent)]
172 StreamEncryptedRecordLimitExceeded(#[from] StreamEncryptedRecordLimitExceededError),
173}
174
175impl From<AppendErrorInternal> for AppendError {
176 fn from(e: AppendErrorInternal) -> Self {
177 match e {
178 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
179 AppendErrorInternal::StreamerMissingInActionError(e) => {
180 AppendError::StreamerMissingInActionError(e)
181 }
182 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
183 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
184 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
185 AppendErrorInternal::StreamEncryptedRecordLimitExceeded(e) => {
186 AppendError::StreamEncryptedRecordLimitExceeded(e)
187 }
188 }
189 }
190}
191
192#[derive(Debug, Clone, thiserror::Error)]
193pub enum AppendConditionFailedError {
194 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
195 FencingTokenMismatch {
196 expected: FencingToken,
197 actual: FencingToken,
198 applied_point: RangeTo<SeqNum>,
199 },
200 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
201 SeqNumMismatch {
202 assigned_seq_num: SeqNum,
203 match_seq_num: SeqNum,
204 },
205}
206
207impl AppendConditionFailedError {
208 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
209 use AppendConditionFailedError::*;
210 match self {
211 SeqNumMismatch {
212 assigned_seq_num, ..
213 } => ..*assigned_seq_num,
214 FencingTokenMismatch { applied_point, .. } => *applied_point,
215 }
216 }
217}
218
219impl From<StreamerError> for AppendError {
220 fn from(e: StreamerError) -> Self {
221 match e {
222 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
223 StreamerError::Storage(e) => Self::Storage(e),
224 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
225 }
226 }
227}
228
229#[derive(Debug, Clone, thiserror::Error)]
230pub enum ReadError {
231 #[error(transparent)]
232 Storage(#[from] StorageError),
233 #[error(transparent)]
234 EncryptionSpecResolution(#[from] EncryptionSpecResolutionError),
235 #[error(transparent)]
236 RecordDecryption(#[from] RecordDecryptionError),
237 #[error(transparent)]
238 TransactionConflict(#[from] TransactionConflictError),
239 #[error(transparent)]
240 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
241 #[error(transparent)]
242 BasinNotFound(#[from] BasinNotFoundError),
243 #[error(transparent)]
244 StreamNotFound(#[from] StreamNotFoundError),
245 #[error(transparent)]
246 BasinDeletionPending(#[from] BasinDeletionPendingError),
247 #[error(transparent)]
248 StreamDeletionPending(#[from] StreamDeletionPendingError),
249 #[error(transparent)]
250 Unwritten(#[from] UnwrittenError),
251}
252
253impl From<StreamerError> for ReadError {
254 fn from(e: StreamerError) -> Self {
255 match e {
256 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
257 StreamerError::Storage(e) => Self::Storage(e),
258 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
259 }
260 }
261}
262
263impl From<kv::DeserializationError> for ReadError {
264 fn from(e: kv::DeserializationError) -> Self {
265 Self::Storage(e.into())
266 }
267}
268
269impl From<slatedb::Error> for ReadError {
270 fn from(e: slatedb::Error) -> Self {
271 Self::Storage(e.into())
272 }
273}
274
275#[derive(Debug, Clone, thiserror::Error)]
276pub enum ListStreamsError {
277 #[error(transparent)]
278 Storage(#[from] StorageError),
279}
280
281impl From<slatedb::Error> for ListStreamsError {
282 fn from(e: slatedb::Error) -> Self {
283 Self::Storage(e.into())
284 }
285}
286
287impl From<kv::DeserializationError> for ListStreamsError {
288 fn from(e: kv::DeserializationError) -> Self {
289 Self::Storage(e.into())
290 }
291}
292
293#[derive(Debug, Clone, thiserror::Error)]
294pub enum CreateStreamError {
295 #[error(transparent)]
296 Storage(#[from] StorageError),
297 #[error(transparent)]
298 TransactionConflict(#[from] TransactionConflictError),
299 #[error(transparent)]
300 BasinNotFound(#[from] BasinNotFoundError),
301 #[error(transparent)]
302 BasinDeletionPending(#[from] BasinDeletionPendingError),
303 #[error(transparent)]
304 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
305 #[error(transparent)]
306 StreamDeletionPending(#[from] StreamDeletionPendingError),
307 #[error(transparent)]
308 Validation(#[from] s2_common::types::ValidationError),
309}
310
311impl From<slatedb::Error> for CreateStreamError {
312 fn from(err: slatedb::Error) -> Self {
313 if err.kind() == slatedb::ErrorKind::Transaction {
314 Self::TransactionConflict(TransactionConflictError)
315 } else {
316 Self::Storage(err.into())
317 }
318 }
319}
320
321impl From<GetBasinConfigError> for CreateStreamError {
322 fn from(err: GetBasinConfigError) -> Self {
323 match err {
324 GetBasinConfigError::Storage(e) => Self::Storage(e),
325 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
326 }
327 }
328}
329
330#[derive(Debug, Clone, thiserror::Error)]
331pub enum GetStreamConfigError {
332 #[error(transparent)]
333 Storage(#[from] StorageError),
334 #[error(transparent)]
335 StreamNotFound(#[from] StreamNotFoundError),
336 #[error(transparent)]
337 StreamDeletionPending(#[from] StreamDeletionPendingError),
338}
339
340#[derive(Debug, Clone, thiserror::Error)]
341pub enum DeleteStreamError {
342 #[error(transparent)]
343 Storage(#[from] StorageError),
344 #[error(transparent)]
345 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
346 #[error(transparent)]
347 RequestDroppedError(#[from] RequestDroppedError),
348 #[error(transparent)]
349 StreamNotFound(#[from] StreamNotFoundError),
350}
351
352impl From<slatedb::Error> for DeleteStreamError {
353 fn from(err: slatedb::Error) -> Self {
354 Self::Storage(err.into())
355 }
356}
357
358#[derive(Debug, Clone, thiserror::Error)]
359pub enum BasinDeletionError {
360 #[error(transparent)]
361 Storage(#[from] StorageError),
362 #[error(transparent)]
363 DeleteStream(#[from] DeleteStreamError),
364}
365
366#[derive(Debug, Clone, thiserror::Error)]
367pub enum StreamDeleteOnEmptyError {
368 #[error(transparent)]
369 Storage(#[from] StorageError),
370 #[error(transparent)]
371 DeleteStream(#[from] DeleteStreamError),
372}
373
374#[derive(Debug, Clone, thiserror::Error)]
375pub enum ListBasinsError {
376 #[error(transparent)]
377 Storage(#[from] StorageError),
378}
379
380impl From<slatedb::Error> for ListBasinsError {
381 fn from(err: slatedb::Error) -> Self {
382 Self::Storage(err.into())
383 }
384}
385
386impl From<kv::DeserializationError> for ListBasinsError {
387 fn from(e: kv::DeserializationError) -> Self {
388 Self::Storage(e.into())
389 }
390}
391
392#[derive(Debug, Clone, thiserror::Error)]
393pub enum CreateBasinError {
394 #[error(transparent)]
395 Storage(#[from] StorageError),
396 #[error(transparent)]
397 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
398 #[error(transparent)]
399 BasinDeletionPending(#[from] BasinDeletionPendingError),
400}
401
402impl From<slatedb::Error> for CreateBasinError {
403 fn from(err: slatedb::Error) -> Self {
404 Self::Storage(err.into())
405 }
406}
407
408#[derive(Debug, Clone, thiserror::Error)]
409pub enum GetBasinConfigError {
410 #[error(transparent)]
411 Storage(#[from] StorageError),
412 #[error(transparent)]
413 BasinNotFound(#[from] BasinNotFoundError),
414}
415
416#[derive(Debug, Clone, thiserror::Error)]
417pub enum ReconfigureBasinError {
418 #[error(transparent)]
419 Storage(#[from] StorageError),
420 #[error(transparent)]
421 TransactionConflict(#[from] TransactionConflictError),
422 #[error(transparent)]
423 BasinNotFound(#[from] BasinNotFoundError),
424 #[error(transparent)]
425 BasinDeletionPending(#[from] BasinDeletionPendingError),
426}
427
428impl From<slatedb::Error> for ReconfigureBasinError {
429 fn from(err: slatedb::Error) -> Self {
430 if err.kind() == slatedb::ErrorKind::Transaction {
431 Self::TransactionConflict(TransactionConflictError)
432 } else {
433 Self::Storage(err.into())
434 }
435 }
436}
437
438#[derive(Debug, Clone, thiserror::Error)]
439pub enum ReconfigureStreamError {
440 #[error(transparent)]
441 Storage(#[from] StorageError),
442 #[error(transparent)]
443 TransactionConflict(#[from] TransactionConflictError),
444 #[error(transparent)]
445 BasinNotFound(#[from] BasinNotFoundError),
446 #[error(transparent)]
447 StreamNotFound(#[from] StreamNotFoundError),
448 #[error(transparent)]
449 StreamDeletionPending(#[from] StreamDeletionPendingError),
450 #[error(transparent)]
451 Validation(#[from] s2_common::types::ValidationError),
452}
453
454impl From<slatedb::Error> for ReconfigureStreamError {
455 fn from(err: slatedb::Error) -> Self {
456 if err.kind() == slatedb::ErrorKind::Transaction {
457 Self::TransactionConflict(TransactionConflictError)
458 } else {
459 Self::Storage(err.into())
460 }
461 }
462}
463
464#[derive(Debug, Clone, thiserror::Error)]
465pub enum DeleteBasinError {
466 #[error(transparent)]
467 Storage(#[from] StorageError),
468 #[error(transparent)]
469 BasinNotFound(#[from] BasinNotFoundError),
470}
471
472impl From<slatedb::Error> for DeleteBasinError {
473 fn from(err: slatedb::Error) -> Self {
474 Self::Storage(err.into())
475 }
476}