Skip to main content

ursula_stream/
response.rs

1use serde::{Deserialize, Serialize};
2use ursula_shard::BucketStreamId;
3
4use crate::model::ProducerRequest;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum StreamResponse {
8    BucketCreated {
9        bucket_id: String,
10    },
11    BucketAlreadyExists {
12        bucket_id: String,
13    },
14    BucketDeleted {
15        bucket_id: String,
16    },
17    Created {
18        stream_id: BucketStreamId,
19        next_offset: u64,
20        closed: bool,
21    },
22    AlreadyExists {
23        next_offset: u64,
24        closed: bool,
25        content_type: String,
26        stream_ttl_seconds: Option<u64>,
27        stream_expires_at_ms: Option<u64>,
28    },
29    Appended {
30        offset: u64,
31        next_offset: u64,
32        closed: bool,
33        deduplicated: bool,
34        producer: Option<ProducerRequest>,
35    },
36    Closed {
37        next_offset: u64,
38        deduplicated: bool,
39        producer: Option<ProducerRequest>,
40    },
41    Deleted {
42        hard_deleted: bool,
43        parent_to_release: Option<BucketStreamId>,
44    },
45    ForkRefAdded {
46        fork_ref_count: u64,
47    },
48    ForkRefReleased {
49        hard_deleted: bool,
50        fork_ref_count: u64,
51        parent_to_release: Option<BucketStreamId>,
52    },
53    ColdFlushed {
54        hot_start_offset: u64,
55    },
56    SnapshotPublished {
57        snapshot_offset: u64,
58    },
59    Accessed {
60        changed: bool,
61        expired: bool,
62    },
63    Error {
64        code: StreamErrorCode,
65        message: String,
66        next_offset: Option<u64>,
67    },
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
71pub enum StreamErrorCode {
72    InvalidBucketId,
73    InvalidStreamId,
74    BucketNotFound,
75    BucketNotEmpty,
76    StreamNotFound,
77    StreamGone,
78    StreamAlreadyExistsConflict,
79    MissingContentType,
80    ContentTypeMismatch,
81    EmptyAppend,
82    StreamClosed,
83    StreamSeqConflict,
84    InvalidProducer,
85    ProducerEpochStale,
86    ProducerSeqConflict,
87    InvalidRetention,
88    InvalidFork,
89    OffsetOutOfRange,
90    InvalidColdFlush,
91    InvalidSnapshot,
92    SnapshotNotFound,
93    SnapshotConflict,
94}
95
96impl StreamResponse {
97    pub(crate) fn error(code: StreamErrorCode, message: impl Into<String>) -> Self {
98        Self::Error {
99            code,
100            message: message.into(),
101            next_offset: None,
102        }
103    }
104
105    pub(crate) fn error_with_next_offset(
106        code: StreamErrorCode,
107        message: impl Into<String>,
108        next_offset: u64,
109    ) -> Self {
110        Self::Error {
111            code,
112            message: message.into(),
113            next_offset: Some(next_offset),
114        }
115    }
116}