1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 record::{FencingToken, SeqNum, StreamPosition},
5 types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12 #[error("deserialization: {0}")]
13 Deserialization(#[from] kv::DeserializationError),
14 #[error("database: {0}")]
15 Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19 fn from(error: slatedb::Error) -> Self {
20 StorageError::Database(Arc::new(error))
21 }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27 pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33 pub basin: BasinName,
34 pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40 pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46 pub basin: BasinName,
47 pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53 pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59 pub basin: BasinName,
60 pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85 #[error(transparent)]
86 Storage(#[from] StorageError),
87 #[error(transparent)]
88 StreamNotFound(#[from] StreamNotFoundError),
89 #[error(transparent)]
90 StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95 #[error(transparent)]
96 Storage(#[from] StorageError),
97 #[error(transparent)]
98 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
99 #[error(transparent)]
100 RequestDroppedError(#[from] RequestDroppedError),
101 #[error(transparent)]
102 ConditionFailed(#[from] AppendConditionFailedError),
103 #[error(transparent)]
104 TimestampMissing(#[from] AppendTimestampRequiredError),
105}
106
107#[derive(Debug, Clone, thiserror::Error)]
108pub enum CheckTailError {
109 #[error(transparent)]
110 Storage(#[from] StorageError),
111 #[error(transparent)]
112 TransactionConflict(#[from] TransactionConflictError),
113 #[error(transparent)]
114 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
115 #[error(transparent)]
116 BasinNotFound(#[from] BasinNotFoundError),
117 #[error(transparent)]
118 StreamNotFound(#[from] StreamNotFoundError),
119 #[error(transparent)]
120 BasinDeletionPending(#[from] BasinDeletionPendingError),
121 #[error(transparent)]
122 StreamDeletionPending(#[from] StreamDeletionPendingError),
123}
124
125impl From<StreamerError> for CheckTailError {
126 fn from(e: StreamerError) -> Self {
127 match e {
128 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
129 StreamerError::Storage(e) => Self::Storage(e),
130 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
131 }
132 }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum AppendError {
137 #[error(transparent)]
138 Storage(#[from] StorageError),
139 #[error(transparent)]
140 TransactionConflict(#[from] TransactionConflictError),
141 #[error(transparent)]
142 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
143 #[error(transparent)]
144 RequestDroppedError(#[from] RequestDroppedError),
145 #[error(transparent)]
146 BasinNotFound(#[from] BasinNotFoundError),
147 #[error(transparent)]
148 StreamNotFound(#[from] StreamNotFoundError),
149 #[error(transparent)]
150 BasinDeletionPending(#[from] BasinDeletionPendingError),
151 #[error(transparent)]
152 StreamDeletionPending(#[from] StreamDeletionPendingError),
153 #[error(transparent)]
154 ConditionFailed(#[from] AppendConditionFailedError),
155 #[error(transparent)]
156 TimestampMissing(#[from] AppendTimestampRequiredError),
157}
158
159impl From<AppendErrorInternal> for AppendError {
160 fn from(e: AppendErrorInternal) -> Self {
161 match e {
162 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
163 AppendErrorInternal::StreamerMissingInActionError(e) => {
164 AppendError::StreamerMissingInActionError(e)
165 }
166 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
167 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
168 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
169 }
170 }
171}
172
173#[derive(Debug, Clone, thiserror::Error)]
174pub enum AppendConditionFailedError {
175 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
176 FencingTokenMismatch {
177 expected: FencingToken,
178 actual: FencingToken,
179 applied_point: RangeTo<SeqNum>,
180 },
181 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
182 SeqNumMismatch {
183 assigned_seq_num: SeqNum,
184 match_seq_num: SeqNum,
185 },
186}
187
188impl AppendConditionFailedError {
189 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
190 use AppendConditionFailedError::*;
191 match self {
192 SeqNumMismatch {
193 assigned_seq_num, ..
194 } => ..*assigned_seq_num,
195 FencingTokenMismatch { applied_point, .. } => *applied_point,
196 }
197 }
198}
199
200impl From<StreamerError> for AppendError {
201 fn from(e: StreamerError) -> Self {
202 match e {
203 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
204 StreamerError::Storage(e) => Self::Storage(e),
205 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
206 }
207 }
208}
209
210#[derive(Debug, Clone, thiserror::Error)]
211pub enum ReadError {
212 #[error(transparent)]
213 Storage(#[from] StorageError),
214 #[error(transparent)]
215 TransactionConflict(#[from] TransactionConflictError),
216 #[error(transparent)]
217 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
218 #[error(transparent)]
219 BasinNotFound(#[from] BasinNotFoundError),
220 #[error(transparent)]
221 StreamNotFound(#[from] StreamNotFoundError),
222 #[error(transparent)]
223 BasinDeletionPending(#[from] BasinDeletionPendingError),
224 #[error(transparent)]
225 StreamDeletionPending(#[from] StreamDeletionPendingError),
226 #[error(transparent)]
227 Unwritten(#[from] UnwrittenError),
228}
229
230impl From<StreamerError> for ReadError {
231 fn from(e: StreamerError) -> Self {
232 match e {
233 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
234 StreamerError::Storage(e) => Self::Storage(e),
235 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
236 }
237 }
238}
239
240impl From<kv::DeserializationError> for ReadError {
241 fn from(e: kv::DeserializationError) -> Self {
242 Self::Storage(e.into())
243 }
244}
245
246impl From<slatedb::Error> for ReadError {
247 fn from(e: slatedb::Error) -> Self {
248 Self::Storage(e.into())
249 }
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum ListStreamsError {
254 #[error(transparent)]
255 Storage(#[from] StorageError),
256}
257
258impl From<slatedb::Error> for ListStreamsError {
259 fn from(e: slatedb::Error) -> Self {
260 Self::Storage(e.into())
261 }
262}
263
264impl From<kv::DeserializationError> for ListStreamsError {
265 fn from(e: kv::DeserializationError) -> Self {
266 Self::Storage(e.into())
267 }
268}
269
270#[derive(Debug, Clone, thiserror::Error)]
271pub enum CreateStreamError {
272 #[error(transparent)]
273 Storage(#[from] StorageError),
274 #[error(transparent)]
275 TransactionConflict(#[from] TransactionConflictError),
276 #[error(transparent)]
277 BasinNotFound(#[from] BasinNotFoundError),
278 #[error(transparent)]
279 BasinDeletionPending(#[from] BasinDeletionPendingError),
280 #[error(transparent)]
281 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
282 #[error(transparent)]
283 StreamDeletionPending(#[from] StreamDeletionPendingError),
284}
285
286impl From<slatedb::Error> for CreateStreamError {
287 fn from(err: slatedb::Error) -> Self {
288 if err.kind() == slatedb::ErrorKind::Transaction {
289 Self::TransactionConflict(TransactionConflictError)
290 } else {
291 Self::Storage(err.into())
292 }
293 }
294}
295
296impl From<GetBasinConfigError> for CreateStreamError {
297 fn from(err: GetBasinConfigError) -> Self {
298 match err {
299 GetBasinConfigError::Storage(e) => Self::Storage(e),
300 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
301 }
302 }
303}
304
305#[derive(Debug, Clone, thiserror::Error)]
306pub enum GetStreamConfigError {
307 #[error(transparent)]
308 Storage(#[from] StorageError),
309 #[error(transparent)]
310 StreamNotFound(#[from] StreamNotFoundError),
311 #[error(transparent)]
312 StreamDeletionPending(#[from] StreamDeletionPendingError),
313}
314
315#[derive(Debug, Clone, thiserror::Error)]
316pub enum DeleteStreamError {
317 #[error(transparent)]
318 Storage(#[from] StorageError),
319 #[error(transparent)]
320 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
321 #[error(transparent)]
322 RequestDroppedError(#[from] RequestDroppedError),
323 #[error(transparent)]
324 StreamNotFound(#[from] StreamNotFoundError),
325}
326
327impl From<slatedb::Error> for DeleteStreamError {
328 fn from(err: slatedb::Error) -> Self {
329 Self::Storage(err.into())
330 }
331}
332
333#[derive(Debug, Clone, thiserror::Error)]
334pub enum BasinDeletionError {
335 #[error(transparent)]
336 Storage(#[from] StorageError),
337 #[error(transparent)]
338 DeleteStream(#[from] DeleteStreamError),
339}
340
341#[derive(Debug, Clone, thiserror::Error)]
342pub enum StreamDeleteOnEmptyError {
343 #[error(transparent)]
344 Storage(#[from] StorageError),
345 #[error(transparent)]
346 DeleteStream(#[from] DeleteStreamError),
347}
348
349#[derive(Debug, Clone, thiserror::Error)]
350pub enum ListBasinsError {
351 #[error(transparent)]
352 Storage(#[from] StorageError),
353}
354
355impl From<slatedb::Error> for ListBasinsError {
356 fn from(err: slatedb::Error) -> Self {
357 Self::Storage(err.into())
358 }
359}
360
361impl From<kv::DeserializationError> for ListBasinsError {
362 fn from(e: kv::DeserializationError) -> Self {
363 Self::Storage(e.into())
364 }
365}
366
367#[derive(Debug, Clone, thiserror::Error)]
368pub enum CreateBasinError {
369 #[error(transparent)]
370 Storage(#[from] StorageError),
371 #[error(transparent)]
372 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
373 #[error(transparent)]
374 BasinDeletionPending(#[from] BasinDeletionPendingError),
375}
376
377impl From<slatedb::Error> for CreateBasinError {
378 fn from(err: slatedb::Error) -> Self {
379 Self::Storage(err.into())
380 }
381}
382
383#[derive(Debug, Clone, thiserror::Error)]
384pub enum GetBasinConfigError {
385 #[error(transparent)]
386 Storage(#[from] StorageError),
387 #[error(transparent)]
388 BasinNotFound(#[from] BasinNotFoundError),
389}
390
391#[derive(Debug, Clone, thiserror::Error)]
392pub enum ReconfigureBasinError {
393 #[error(transparent)]
394 Storage(#[from] StorageError),
395 #[error(transparent)]
396 TransactionConflict(#[from] TransactionConflictError),
397 #[error(transparent)]
398 BasinNotFound(#[from] BasinNotFoundError),
399 #[error(transparent)]
400 BasinDeletionPending(#[from] BasinDeletionPendingError),
401}
402
403impl From<slatedb::Error> for ReconfigureBasinError {
404 fn from(err: slatedb::Error) -> Self {
405 if err.kind() == slatedb::ErrorKind::Transaction {
406 Self::TransactionConflict(TransactionConflictError)
407 } else {
408 Self::Storage(err.into())
409 }
410 }
411}
412
413#[derive(Debug, Clone, thiserror::Error)]
414pub enum ReconfigureStreamError {
415 #[error(transparent)]
416 Storage(#[from] StorageError),
417 #[error(transparent)]
418 TransactionConflict(#[from] TransactionConflictError),
419 #[error(transparent)]
420 StreamNotFound(#[from] StreamNotFoundError),
421 #[error(transparent)]
422 StreamDeletionPending(#[from] StreamDeletionPendingError),
423}
424
425impl From<slatedb::Error> for ReconfigureStreamError {
426 fn from(err: slatedb::Error) -> Self {
427 if err.kind() == slatedb::ErrorKind::Transaction {
428 Self::TransactionConflict(TransactionConflictError)
429 } else {
430 Self::Storage(err.into())
431 }
432 }
433}
434
435#[derive(Debug, Clone, thiserror::Error)]
436pub enum DeleteBasinError {
437 #[error(transparent)]
438 Storage(#[from] StorageError),
439 #[error(transparent)]
440 BasinNotFound(#[from] BasinNotFoundError),
441}
442
443impl From<slatedb::Error> for DeleteBasinError {
444 fn from(err: slatedb::Error) -> Self {
445 Self::Storage(err.into())
446 }
447}