1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 record::{FencingToken, SeqNum, StreamPosition},
5 types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12 #[error("deserialization: {0}")]
13 Deserialization(#[from] kv::DeserializationError),
14 #[error("database: {0}")]
15 Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19 fn from(error: slatedb::Error) -> Self {
20 StorageError::Database(Arc::new(error))
21 }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27 pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33 pub basin: BasinName,
34 pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40 pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46 pub basin: BasinName,
47 pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53 pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59 pub basin: BasinName,
60 pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68#[error("streamer missing in action")]
69pub struct StreamerMissingInActionError;
70
71#[derive(Debug, Clone, thiserror::Error)]
72#[error("request dropped")]
73pub struct RequestDroppedError;
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85 #[error(transparent)]
86 Storage(#[from] StorageError),
87 #[error(transparent)]
88 StreamNotFound(#[from] StreamNotFoundError),
89 #[error(transparent)]
90 StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95 #[error(transparent)]
96 Storage(#[from] StorageError),
97 #[error(transparent)]
98 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
99 #[error(transparent)]
100 RequestDroppedError(#[from] RequestDroppedError),
101 #[error(transparent)]
102 ConditionFailed(#[from] AppendConditionFailedError),
103 #[error(transparent)]
104 TimestampMissing(#[from] AppendTimestampRequiredError),
105}
106
107#[derive(Debug, Clone, thiserror::Error)]
108pub enum CheckTailError {
109 #[error(transparent)]
110 Storage(#[from] StorageError),
111 #[error(transparent)]
112 TransactionConflict(#[from] TransactionConflictError),
113 #[error(transparent)]
114 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
115 #[error(transparent)]
116 BasinNotFound(#[from] BasinNotFoundError),
117 #[error(transparent)]
118 StreamNotFound(#[from] StreamNotFoundError),
119 #[error(transparent)]
120 BasinDeletionPending(#[from] BasinDeletionPendingError),
121 #[error(transparent)]
122 StreamDeletionPending(#[from] StreamDeletionPendingError),
123}
124
125impl From<StreamerError> for CheckTailError {
126 fn from(e: StreamerError) -> Self {
127 match e {
128 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
129 StreamerError::Storage(e) => Self::Storage(e),
130 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
131 }
132 }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum AppendError {
137 #[error(transparent)]
138 Storage(#[from] StorageError),
139 #[error(transparent)]
140 TransactionConflict(#[from] TransactionConflictError),
141 #[error(transparent)]
142 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
143 #[error(transparent)]
144 RequestDroppedError(#[from] RequestDroppedError),
145 #[error(transparent)]
146 BasinNotFound(#[from] BasinNotFoundError),
147 #[error(transparent)]
148 StreamNotFound(#[from] StreamNotFoundError),
149 #[error(transparent)]
150 BasinDeletionPending(#[from] BasinDeletionPendingError),
151 #[error(transparent)]
152 StreamDeletionPending(#[from] StreamDeletionPendingError),
153 #[error(transparent)]
154 ConditionFailed(#[from] AppendConditionFailedError),
155 #[error(transparent)]
156 TimestampMissing(#[from] AppendTimestampRequiredError),
157}
158
159impl From<AppendErrorInternal> for AppendError {
160 fn from(e: AppendErrorInternal) -> Self {
161 match e {
162 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
163 AppendErrorInternal::StreamerMissingInActionError(e) => {
164 AppendError::StreamerMissingInActionError(e)
165 }
166 AppendErrorInternal::RequestDroppedError(e) => AppendError::RequestDroppedError(e),
167 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
168 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
169 }
170 }
171}
172
173#[derive(Debug, Clone, thiserror::Error)]
174pub enum AppendConditionFailedError {
175 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
176 FencingTokenMismatch {
177 expected: FencingToken,
178 actual: FencingToken,
179 applied_point: RangeTo<SeqNum>,
180 },
181 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
182 SeqNumMismatch {
183 assigned_seq_num: SeqNum,
184 match_seq_num: SeqNum,
185 },
186}
187
188impl AppendConditionFailedError {
189 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
190 use AppendConditionFailedError::*;
191 match self {
192 SeqNumMismatch {
193 assigned_seq_num, ..
194 } => ..*assigned_seq_num,
195 FencingTokenMismatch { applied_point, .. } => *applied_point,
196 }
197 }
198}
199
200impl From<StreamerError> for AppendError {
201 fn from(e: StreamerError) -> Self {
202 match e {
203 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
204 StreamerError::Storage(e) => Self::Storage(e),
205 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
206 }
207 }
208}
209
210#[derive(Debug, Clone, thiserror::Error)]
211pub enum ReadError {
212 #[error(transparent)]
213 Storage(#[from] StorageError),
214 #[error(transparent)]
215 TransactionConflict(#[from] TransactionConflictError),
216 #[error(transparent)]
217 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
218 #[error(transparent)]
219 BasinNotFound(#[from] BasinNotFoundError),
220 #[error(transparent)]
221 StreamNotFound(#[from] StreamNotFoundError),
222 #[error(transparent)]
223 BasinDeletionPending(#[from] BasinDeletionPendingError),
224 #[error(transparent)]
225 StreamDeletionPending(#[from] StreamDeletionPendingError),
226 #[error(transparent)]
227 Unwritten(#[from] UnwrittenError),
228}
229
230impl From<StreamerError> for ReadError {
231 fn from(e: StreamerError) -> Self {
232 match e {
233 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
234 StreamerError::Storage(e) => Self::Storage(e),
235 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
236 }
237 }
238}
239
240impl From<kv::DeserializationError> for ReadError {
241 fn from(e: kv::DeserializationError) -> Self {
242 Self::Storage(e.into())
243 }
244}
245
246impl From<slatedb::Error> for ReadError {
247 fn from(e: slatedb::Error) -> Self {
248 Self::Storage(e.into())
249 }
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
253pub enum ListStreamsError {
254 #[error(transparent)]
255 Storage(#[from] StorageError),
256}
257
258impl From<slatedb::Error> for ListStreamsError {
259 fn from(e: slatedb::Error) -> Self {
260 Self::Storage(e.into())
261 }
262}
263
264impl From<kv::DeserializationError> for ListStreamsError {
265 fn from(e: kv::DeserializationError) -> Self {
266 Self::Storage(e.into())
267 }
268}
269
270#[derive(Debug, Clone, thiserror::Error)]
271pub enum CreateStreamError {
272 #[error(transparent)]
273 Storage(#[from] StorageError),
274 #[error(transparent)]
275 TransactionConflict(#[from] TransactionConflictError),
276 #[error(transparent)]
277 BasinNotFound(#[from] BasinNotFoundError),
278 #[error(transparent)]
279 BasinDeletionPending(#[from] BasinDeletionPendingError),
280 #[error(transparent)]
281 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
282 #[error(transparent)]
283 StreamDeletionPending(#[from] StreamDeletionPendingError),
284}
285
286impl From<slatedb::Error> for CreateStreamError {
287 fn from(err: slatedb::Error) -> Self {
288 if err.kind() == slatedb::ErrorKind::Transaction {
289 Self::TransactionConflict(TransactionConflictError)
290 } else {
291 Self::Storage(err.into())
292 }
293 }
294}
295
296impl From<GetBasinConfigError> for CreateStreamError {
297 fn from(err: GetBasinConfigError) -> Self {
298 match err {
299 GetBasinConfigError::Storage(e) => Self::Storage(e),
300 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
301 }
302 }
303}
304
305#[derive(Debug, Clone, thiserror::Error)]
306pub enum GetStreamConfigError {
307 #[error(transparent)]
308 Storage(#[from] StorageError),
309 #[error(transparent)]
310 StreamNotFound(#[from] StreamNotFoundError),
311}
312
313#[derive(Debug, Clone, thiserror::Error)]
314pub enum DeleteStreamError {
315 #[error(transparent)]
316 Storage(#[from] StorageError),
317 #[error(transparent)]
318 StreamerMissingInActionError(#[from] StreamerMissingInActionError),
319 #[error(transparent)]
320 RequestDroppedError(#[from] RequestDroppedError),
321 #[error(transparent)]
322 StreamNotFound(#[from] StreamNotFoundError),
323}
324
325impl From<slatedb::Error> for DeleteStreamError {
326 fn from(err: slatedb::Error) -> Self {
327 Self::Storage(err.into())
328 }
329}
330
331#[derive(Debug, Clone, thiserror::Error)]
332pub enum ListBasinsError {
333 #[error(transparent)]
334 Storage(#[from] StorageError),
335}
336
337impl From<slatedb::Error> for ListBasinsError {
338 fn from(err: slatedb::Error) -> Self {
339 Self::Storage(err.into())
340 }
341}
342
343impl From<kv::DeserializationError> for ListBasinsError {
344 fn from(e: kv::DeserializationError) -> Self {
345 Self::Storage(e.into())
346 }
347}
348
349#[derive(Debug, Clone, thiserror::Error)]
350pub enum CreateBasinError {
351 #[error(transparent)]
352 Storage(#[from] StorageError),
353 #[error(transparent)]
354 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
355 #[error(transparent)]
356 BasinDeletionPending(#[from] BasinDeletionPendingError),
357}
358
359impl From<slatedb::Error> for CreateBasinError {
360 fn from(err: slatedb::Error) -> Self {
361 Self::Storage(err.into())
362 }
363}
364
365#[derive(Debug, Clone, thiserror::Error)]
366pub enum GetBasinConfigError {
367 #[error(transparent)]
368 Storage(#[from] StorageError),
369 #[error(transparent)]
370 BasinNotFound(#[from] BasinNotFoundError),
371}
372
373#[derive(Debug, Clone, thiserror::Error)]
374pub enum ReconfigureBasinError {
375 #[error(transparent)]
376 Storage(#[from] StorageError),
377 #[error(transparent)]
378 TransactionConflict(#[from] TransactionConflictError),
379 #[error(transparent)]
380 BasinNotFound(#[from] BasinNotFoundError),
381 #[error(transparent)]
382 BasinDeletionPending(#[from] BasinDeletionPendingError),
383}
384
385impl From<slatedb::Error> for ReconfigureBasinError {
386 fn from(err: slatedb::Error) -> Self {
387 if err.kind() == slatedb::ErrorKind::Transaction {
388 Self::TransactionConflict(TransactionConflictError)
389 } else {
390 Self::Storage(err.into())
391 }
392 }
393}
394
395#[derive(Debug, Clone, thiserror::Error)]
396pub enum ReconfigureStreamError {
397 #[error(transparent)]
398 Storage(#[from] StorageError),
399 #[error(transparent)]
400 TransactionConflict(#[from] TransactionConflictError),
401 #[error(transparent)]
402 StreamNotFound(#[from] StreamNotFoundError),
403 #[error(transparent)]
404 StreamDeletionPending(#[from] StreamDeletionPendingError),
405}
406
407impl From<slatedb::Error> for ReconfigureStreamError {
408 fn from(err: slatedb::Error) -> Self {
409 if err.kind() == slatedb::ErrorKind::Transaction {
410 Self::TransactionConflict(TransactionConflictError)
411 } else {
412 Self::Storage(err.into())
413 }
414 }
415}
416
417#[derive(Debug, Clone, thiserror::Error)]
418pub enum DeleteBasinError {
419 #[error(transparent)]
420 Storage(#[from] StorageError),
421 #[error(transparent)]
422 BasinNotFound(#[from] BasinNotFoundError),
423}
424
425impl From<slatedb::Error> for DeleteBasinError {
426 fn from(err: slatedb::Error) -> Self {
427 Self::Storage(err.into())
428 }
429}