1use std::{ops::RangeTo, sync::Arc};
2
3use s2_common::{
4 record::{FencingToken, SeqNum, StreamPosition},
5 types::{basin::BasinName, stream::StreamName},
6};
7
8use crate::backend::kv;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum StorageError {
12 #[error("deserialization: {0}")]
13 Deserialization(#[from] kv::DeserializationError),
14 #[error("database: {0}")]
15 Database(Arc<slatedb::Error>),
16}
17
18impl From<slatedb::Error> for StorageError {
19 fn from(error: slatedb::Error) -> Self {
20 StorageError::Database(Arc::new(error))
21 }
22}
23
24#[derive(Debug, Clone, thiserror::Error)]
25#[error("basin `{basin}` not found")]
26pub struct BasinNotFoundError {
27 pub basin: BasinName,
28}
29
30#[derive(Debug, Clone, thiserror::Error)]
31#[error("stream `{stream}` in basin `{basin}` not found")]
32pub struct StreamNotFoundError {
33 pub basin: BasinName,
34 pub stream: StreamName,
35}
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[error("basin `{basin}` already exists")]
39pub struct BasinAlreadyExistsError {
40 pub basin: BasinName,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44#[error("stream `{stream}` in basin `{basin}` already exists")]
45pub struct StreamAlreadyExistsError {
46 pub basin: BasinName,
47 pub stream: StreamName,
48}
49
50#[derive(Debug, Clone, thiserror::Error)]
51#[error("basin `{basin}` is being deleted")]
52pub struct BasinDeletionPendingError {
53 pub basin: BasinName,
54}
55
56#[derive(Debug, Clone, thiserror::Error)]
57#[error("stream `{stream}` in basin `{basin}` is being deleted")]
58pub struct StreamDeletionPendingError {
59 pub basin: BasinName,
60 pub stream: StreamName,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64#[error("unwritten position: {0}")]
65pub struct UnwrittenError(pub StreamPosition);
66
67#[derive(Debug, Clone, thiserror::Error)]
68pub enum UnavailableError {
69 #[error("unavailable: missing in action")]
70 MissingInAction,
71 #[error("unavailable: request drop")]
72 RequestDrop,
73}
74
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("record timestamp was required but was missing")]
77pub struct AppendTimestampRequiredError;
78
79#[derive(Debug, Clone, thiserror::Error)]
80#[error("transaction conflict occurred – this is usually retriable")]
81pub struct TransactionConflictError;
82
83#[derive(Debug, Clone, thiserror::Error)]
84pub(super) enum StreamerError {
85 #[error(transparent)]
86 Storage(#[from] StorageError),
87 #[error(transparent)]
88 StreamNotFound(#[from] StreamNotFoundError),
89 #[error(transparent)]
90 StreamDeletionPending(#[from] StreamDeletionPendingError),
91}
92
93#[derive(Debug, Clone, thiserror::Error)]
94pub(super) enum AppendErrorInternal {
95 #[error(transparent)]
96 Storage(#[from] StorageError),
97 #[error(transparent)]
98 Unavailable(#[from] UnavailableError),
99 #[error(transparent)]
100 ConditionFailed(#[from] AppendConditionFailedError),
101 #[error(transparent)]
102 TimestampMissing(#[from] AppendTimestampRequiredError),
103}
104
105#[derive(Debug, Clone, thiserror::Error)]
106pub enum CheckTailError {
107 #[error(transparent)]
108 Storage(#[from] StorageError),
109 #[error(transparent)]
110 TransactionConflict(#[from] TransactionConflictError),
111 #[error(transparent)]
112 Unavailable(#[from] UnavailableError),
113 #[error(transparent)]
114 BasinNotFound(#[from] BasinNotFoundError),
115 #[error(transparent)]
116 StreamNotFound(#[from] StreamNotFoundError),
117 #[error(transparent)]
118 BasinDeletionPending(#[from] BasinDeletionPendingError),
119 #[error(transparent)]
120 StreamDeletionPending(#[from] StreamDeletionPendingError),
121}
122
123impl From<StreamerError> for CheckTailError {
124 fn from(e: StreamerError) -> Self {
125 match e {
126 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
127 StreamerError::Storage(e) => Self::Storage(e),
128 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
129 }
130 }
131}
132
133#[derive(Debug, Clone, thiserror::Error)]
134pub enum AppendError {
135 #[error(transparent)]
136 Storage(#[from] StorageError),
137 #[error(transparent)]
138 TransactionConflict(#[from] TransactionConflictError),
139 #[error(transparent)]
140 Unavailable(#[from] UnavailableError),
141 #[error(transparent)]
142 BasinNotFound(#[from] BasinNotFoundError),
143 #[error(transparent)]
144 StreamNotFound(#[from] StreamNotFoundError),
145 #[error(transparent)]
146 BasinDeletionPending(#[from] BasinDeletionPendingError),
147 #[error(transparent)]
148 StreamDeletionPending(#[from] StreamDeletionPendingError),
149 #[error(transparent)]
150 ConditionFailed(#[from] AppendConditionFailedError),
151 #[error(transparent)]
152 TimestampMissing(#[from] AppendTimestampRequiredError),
153}
154
155impl From<AppendErrorInternal> for AppendError {
156 fn from(e: AppendErrorInternal) -> Self {
157 match e {
158 AppendErrorInternal::Storage(e) => AppendError::Storage(e),
159 AppendErrorInternal::Unavailable(e) => AppendError::Unavailable(e),
160 AppendErrorInternal::ConditionFailed(e) => AppendError::ConditionFailed(e),
161 AppendErrorInternal::TimestampMissing(e) => AppendError::TimestampMissing(e),
162 }
163 }
164}
165
166#[derive(Debug, Clone, thiserror::Error)]
167pub enum AppendConditionFailedError {
168 #[error("fencing token mismatch: expected `{expected}`, actual `{actual}`")]
169 FencingTokenMismatch {
170 expected: FencingToken,
171 actual: FencingToken,
172 applied_point: RangeTo<SeqNum>,
173 },
174 #[error("sequence number mismatch: expected {match_seq_num}, actual {assigned_seq_num}")]
175 SeqNumMismatch {
176 assigned_seq_num: SeqNum,
177 match_seq_num: SeqNum,
178 },
179}
180
181impl AppendConditionFailedError {
182 pub fn durability_dependency(&self) -> RangeTo<SeqNum> {
183 use AppendConditionFailedError::*;
184 match self {
185 SeqNumMismatch {
186 assigned_seq_num, ..
187 } => ..*assigned_seq_num,
188 FencingTokenMismatch { applied_point, .. } => *applied_point,
189 }
190 }
191}
192
193impl From<StreamerError> for AppendError {
194 fn from(e: StreamerError) -> Self {
195 match e {
196 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
197 StreamerError::Storage(e) => Self::Storage(e),
198 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
199 }
200 }
201}
202
203#[derive(Debug, Clone, thiserror::Error)]
204pub enum ReadError {
205 #[error(transparent)]
206 Storage(#[from] StorageError),
207 #[error(transparent)]
208 TransactionConflict(#[from] TransactionConflictError),
209 #[error(transparent)]
210 Unavailable(#[from] UnavailableError),
211 #[error(transparent)]
212 BasinNotFound(#[from] BasinNotFoundError),
213 #[error(transparent)]
214 StreamNotFound(#[from] StreamNotFoundError),
215 #[error(transparent)]
216 BasinDeletionPending(#[from] BasinDeletionPendingError),
217 #[error(transparent)]
218 StreamDeletionPending(#[from] StreamDeletionPendingError),
219 #[error(transparent)]
220 Unwritten(#[from] UnwrittenError),
221}
222
223impl From<StreamerError> for ReadError {
224 fn from(e: StreamerError) -> Self {
225 match e {
226 StreamerError::StreamNotFound(e) => Self::StreamNotFound(e),
227 StreamerError::Storage(e) => Self::Storage(e),
228 StreamerError::StreamDeletionPending(e) => Self::StreamDeletionPending(e),
229 }
230 }
231}
232
233impl From<kv::DeserializationError> for ReadError {
234 fn from(e: kv::DeserializationError) -> Self {
235 Self::Storage(e.into())
236 }
237}
238
239impl From<slatedb::Error> for ReadError {
240 fn from(e: slatedb::Error) -> Self {
241 Self::Storage(e.into())
242 }
243}
244
245#[derive(Debug, Clone, thiserror::Error)]
246pub enum ListStreamsError {
247 #[error(transparent)]
248 Storage(#[from] StorageError),
249}
250
251impl From<slatedb::Error> for ListStreamsError {
252 fn from(e: slatedb::Error) -> Self {
253 Self::Storage(e.into())
254 }
255}
256
257impl From<kv::DeserializationError> for ListStreamsError {
258 fn from(e: kv::DeserializationError) -> Self {
259 Self::Storage(e.into())
260 }
261}
262
263#[derive(Debug, Clone, thiserror::Error)]
264pub enum CreateStreamError {
265 #[error(transparent)]
266 Storage(#[from] StorageError),
267 #[error(transparent)]
268 TransactionConflict(#[from] TransactionConflictError),
269 #[error(transparent)]
270 Unavailable(#[from] UnavailableError),
271 #[error(transparent)]
272 BasinNotFound(#[from] BasinNotFoundError),
273 #[error(transparent)]
274 BasinDeletionPending(#[from] BasinDeletionPendingError),
275 #[error(transparent)]
276 StreamAlreadyExists(#[from] StreamAlreadyExistsError),
277 #[error(transparent)]
278 StreamDeletionPending(#[from] StreamDeletionPendingError),
279}
280
281impl From<slatedb::Error> for CreateStreamError {
282 fn from(err: slatedb::Error) -> Self {
283 if err.kind() == slatedb::ErrorKind::Transaction {
284 Self::TransactionConflict(TransactionConflictError)
285 } else {
286 Self::Storage(err.into())
287 }
288 }
289}
290
291impl From<GetBasinConfigError> for CreateStreamError {
292 fn from(err: GetBasinConfigError) -> Self {
293 match err {
294 GetBasinConfigError::Storage(e) => Self::Storage(e),
295 GetBasinConfigError::BasinNotFound(e) => Self::BasinNotFound(e),
296 }
297 }
298}
299
300#[derive(Debug, Clone, thiserror::Error)]
301pub enum GetStreamConfigError {
302 #[error(transparent)]
303 Storage(#[from] StorageError),
304 #[error(transparent)]
305 StreamNotFound(#[from] StreamNotFoundError),
306}
307
308#[derive(Debug, Clone, thiserror::Error)]
309pub enum DeleteStreamError {
310 #[error(transparent)]
311 Storage(#[from] StorageError),
312 #[error(transparent)]
313 Unavailable(#[from] UnavailableError),
314 #[error(transparent)]
315 StreamNotFound(#[from] StreamNotFoundError),
316}
317
318impl From<slatedb::Error> for DeleteStreamError {
319 fn from(err: slatedb::Error) -> Self {
320 Self::Storage(err.into())
321 }
322}
323
324#[derive(Debug, Clone, thiserror::Error)]
325pub enum ListBasinsError {
326 #[error(transparent)]
327 Storage(#[from] StorageError),
328}
329
330impl From<slatedb::Error> for ListBasinsError {
331 fn from(err: slatedb::Error) -> Self {
332 Self::Storage(err.into())
333 }
334}
335
336impl From<kv::DeserializationError> for ListBasinsError {
337 fn from(e: kv::DeserializationError) -> Self {
338 Self::Storage(e.into())
339 }
340}
341
342#[derive(Debug, Clone, thiserror::Error)]
343pub enum CreateBasinError {
344 #[error(transparent)]
345 Storage(#[from] StorageError),
346 #[error(transparent)]
347 BasinAlreadyExists(#[from] BasinAlreadyExistsError),
348 #[error(transparent)]
349 BasinDeletionPending(#[from] BasinDeletionPendingError),
350}
351
352impl From<slatedb::Error> for CreateBasinError {
353 fn from(err: slatedb::Error) -> Self {
354 Self::Storage(err.into())
355 }
356}
357
358#[derive(Debug, Clone, thiserror::Error)]
359pub enum GetBasinConfigError {
360 #[error(transparent)]
361 Storage(#[from] StorageError),
362 #[error(transparent)]
363 BasinNotFound(#[from] BasinNotFoundError),
364}
365
366#[derive(Debug, Clone, thiserror::Error)]
367pub enum ReconfigureBasinError {
368 #[error(transparent)]
369 Storage(#[from] StorageError),
370 #[error(transparent)]
371 TransactionConflict(#[from] TransactionConflictError),
372 #[error(transparent)]
373 BasinNotFound(#[from] BasinNotFoundError),
374 #[error(transparent)]
375 BasinDeletionPending(#[from] BasinDeletionPendingError),
376}
377
378impl From<slatedb::Error> for ReconfigureBasinError {
379 fn from(err: slatedb::Error) -> Self {
380 if err.kind() == slatedb::ErrorKind::Transaction {
381 Self::TransactionConflict(TransactionConflictError)
382 } else {
383 Self::Storage(err.into())
384 }
385 }
386}
387
388#[derive(Debug, Clone, thiserror::Error)]
389pub enum ReconfigureStreamError {
390 #[error(transparent)]
391 Storage(#[from] StorageError),
392 #[error(transparent)]
393 TransactionConflict(#[from] TransactionConflictError),
394 #[error(transparent)]
395 Unavailable(#[from] UnavailableError),
396 #[error(transparent)]
397 StreamNotFound(#[from] StreamNotFoundError),
398 #[error(transparent)]
399 StreamDeletionPending(#[from] StreamDeletionPendingError),
400}
401
402impl From<slatedb::Error> for ReconfigureStreamError {
403 fn from(err: slatedb::Error) -> Self {
404 if err.kind() == slatedb::ErrorKind::Transaction {
405 Self::TransactionConflict(TransactionConflictError)
406 } else {
407 Self::Storage(err.into())
408 }
409 }
410}
411
412#[derive(Debug, Clone, thiserror::Error)]
413pub enum DeleteBasinError {
414 #[error(transparent)]
415 Storage(#[from] StorageError),
416 #[error(transparent)]
417 BasinNotFound(#[from] BasinNotFoundError),
418}
419
420impl From<slatedb::Error> for DeleteBasinError {
421 fn from(err: slatedb::Error) -> Self {
422 Self::Storage(err.into())
423 }
424}