1use serde::{Deserialize, Serialize};
2use ursula_proto::{ColdChunkRefV1, ExternalPayloadRefV1, ProducerRequestV1};
3use ursula_shard::BucketStreamId;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
6pub enum StreamStatus {
7 Open,
8 Closed,
9 SoftDeleted,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct StreamMetadata {
14 pub stream_id: BucketStreamId,
15 pub content_type: String,
16 pub status: StreamStatus,
17 pub tail_offset: u64,
18 pub last_stream_seq: Option<String>,
19 pub stream_ttl_seconds: Option<u64>,
20 pub stream_expires_at_ms: Option<u64>,
21 pub created_at_ms: u64,
22 pub last_ttl_touch_at_ms: u64,
23 pub forked_from: Option<BucketStreamId>,
24 pub fork_offset: Option<u64>,
25 pub fork_ref_count: u64,
26}
27
28pub type ProducerRequest = ProducerRequestV1;
29
30#[derive(Debug)]
31pub struct AppendStreamInput<'a> {
32 pub stream_id: BucketStreamId,
33 pub content_type: Option<&'a str>,
34 pub payload: &'a [u8],
35 pub close_after: bool,
36 pub stream_seq: Option<String>,
37 pub producer: Option<ProducerRequest>,
38 pub now_ms: u64,
39}
40
41#[derive(Debug)]
42pub(crate) struct AppendExternalInput<'a> {
43 pub(crate) stream_id: BucketStreamId,
44 pub(crate) content_type: Option<&'a str>,
45 pub(crate) payload: ExternalPayloadRef,
46 pub(crate) close_after: bool,
47 pub(crate) stream_seq: Option<String>,
48 pub(crate) producer: Option<ProducerRequest>,
49 pub(crate) now_ms: u64,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
53pub struct ProducerSnapshot {
54 pub producer_id: String,
55 pub producer_epoch: u64,
56 pub producer_seq: u64,
57 pub last_start_offset: u64,
58 pub last_next_offset: u64,
59 pub last_closed: bool,
60 pub last_items: Vec<ProducerAppendRecord>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct ProducerAppendRecord {
65 pub start_offset: u64,
66 pub next_offset: u64,
67 pub closed: bool,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub(crate) struct ProducerState {
72 pub(crate) producer_epoch: u64,
73 pub(crate) producer_seq: u64,
74 pub(crate) last_start_offset: u64,
75 pub(crate) last_next_offset: u64,
76 pub(crate) last_closed: bool,
77 pub(crate) last_items: Vec<ProducerAppendRecord>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct StreamBatchAppend {
82 pub items: Vec<StreamBatchAppendItem>,
83 pub deduplicated: bool,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct StreamBatchAppendItem {
88 pub offset: u64,
89 pub next_offset: u64,
90 pub closed: bool,
91 pub deduplicated: bool,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct StreamRead {
96 pub offset: u64,
97 pub next_offset: u64,
98 pub content_type: String,
99 pub payload: Vec<u8>,
100 pub up_to_date: bool,
101 pub closed: bool,
102}
103
104pub type ColdChunkRef = ColdChunkRefV1;
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107pub struct ObjectPayloadRef {
108 pub start_offset: u64,
109 pub end_offset: u64,
110 pub s3_path: String,
111 pub object_size: u64,
112}
113
114impl From<&ColdChunkRef> for ObjectPayloadRef {
115 fn from(chunk: &ColdChunkRef) -> Self {
116 Self {
117 start_offset: chunk.start_offset,
118 end_offset: chunk.end_offset,
119 s3_path: chunk.s3_path.clone(),
120 object_size: chunk.object_size,
121 }
122 }
123}
124
125pub type ExternalPayloadRef = ExternalPayloadRefV1;
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128pub struct HotPayloadSegment {
129 pub start_offset: u64,
130 pub end_offset: u64,
131 pub payload_start: usize,
132 pub payload_end: usize,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct ColdFlushCandidate {
137 pub stream_id: BucketStreamId,
138 pub start_offset: u64,
139 pub end_offset: u64,
140 pub payload: Vec<u8>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct StreamReadColdSegment {
145 pub chunk: ColdChunkRef,
146 pub read_start_offset: u64,
147 pub len: usize,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct StreamReadObjectSegment {
152 pub object: ObjectPayloadRef,
153 pub read_start_offset: u64,
154 pub len: usize,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq)]
158pub enum StreamReadSegment {
159 Object(StreamReadObjectSegment),
160 Hot(Vec<u8>),
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct StreamReadPlan {
165 pub offset: u64,
166 pub next_offset: u64,
167 pub content_type: String,
168 pub segments: Vec<StreamReadSegment>,
169 pub up_to_date: bool,
170 pub closed: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174pub struct StreamMessageRecord {
175 pub start_offset: u64,
176 pub end_offset: u64,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
180pub struct StreamVisibleSnapshot {
181 pub offset: u64,
182 pub content_type: String,
183 pub payload: Vec<u8>,
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub struct StreamBootstrapPlan {
188 pub snapshot: Option<StreamVisibleSnapshot>,
189 pub updates: Vec<StreamMessageRecord>,
190 pub next_offset: u64,
191 pub content_type: String,
192 pub up_to_date: bool,
193 pub closed: bool,
194}