Skip to main content

ursula_stream/
model.rs

1use serde::{Deserialize, Serialize};
2use ursula_proto::{ColdChunkRefV1, ExternalPayloadRefV1, ProducerRequestV1};
3use ursula_shard::BucketStreamId;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
6pub enum StreamStatus {
7    Open,
8    Closed,
9    SoftDeleted,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct StreamMetadata {
14    pub stream_id: BucketStreamId,
15    pub content_type: String,
16    pub status: StreamStatus,
17    pub tail_offset: u64,
18    pub last_stream_seq: Option<String>,
19    pub stream_ttl_seconds: Option<u64>,
20    pub stream_expires_at_ms: Option<u64>,
21    pub created_at_ms: u64,
22    pub last_ttl_touch_at_ms: u64,
23    pub forked_from: Option<BucketStreamId>,
24    pub fork_offset: Option<u64>,
25    pub fork_ref_count: u64,
26}
27
28pub type ProducerRequest = ProducerRequestV1;
29
30#[derive(Debug)]
31pub struct AppendStreamInput<'a> {
32    pub stream_id: BucketStreamId,
33    pub content_type: Option<&'a str>,
34    pub payload: &'a [u8],
35    pub close_after: bool,
36    pub stream_seq: Option<String>,
37    pub producer: Option<ProducerRequest>,
38    pub now_ms: u64,
39}
40
41#[derive(Debug)]
42pub(crate) struct AppendExternalInput<'a> {
43    pub(crate) stream_id: BucketStreamId,
44    pub(crate) content_type: Option<&'a str>,
45    pub(crate) payload: ExternalPayloadRef,
46    pub(crate) close_after: bool,
47    pub(crate) stream_seq: Option<String>,
48    pub(crate) producer: Option<ProducerRequest>,
49    pub(crate) now_ms: u64,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
53pub struct ProducerSnapshot {
54    pub producer_id: String,
55    pub producer_epoch: u64,
56    pub producer_seq: u64,
57    pub last_start_offset: u64,
58    pub last_next_offset: u64,
59    pub last_closed: bool,
60    pub last_items: Vec<ProducerAppendRecord>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct ProducerAppendRecord {
65    pub start_offset: u64,
66    pub next_offset: u64,
67    pub closed: bool,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub(crate) struct ProducerState {
72    pub(crate) producer_epoch: u64,
73    pub(crate) producer_seq: u64,
74    pub(crate) last_start_offset: u64,
75    pub(crate) last_next_offset: u64,
76    pub(crate) last_closed: bool,
77    pub(crate) last_items: Vec<ProducerAppendRecord>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct StreamBatchAppend {
82    pub items: Vec<StreamBatchAppendItem>,
83    pub deduplicated: bool,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct StreamBatchAppendItem {
88    pub offset: u64,
89    pub next_offset: u64,
90    pub closed: bool,
91    pub deduplicated: bool,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct StreamRead {
96    pub offset: u64,
97    pub next_offset: u64,
98    pub content_type: String,
99    pub payload: Vec<u8>,
100    pub up_to_date: bool,
101    pub closed: bool,
102}
103
104pub type ColdChunkRef = ColdChunkRefV1;
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107pub struct ObjectPayloadRef {
108    pub start_offset: u64,
109    pub end_offset: u64,
110    pub s3_path: String,
111    pub object_size: u64,
112}
113
114impl From<&ColdChunkRef> for ObjectPayloadRef {
115    fn from(chunk: &ColdChunkRef) -> Self {
116        Self {
117            start_offset: chunk.start_offset,
118            end_offset: chunk.end_offset,
119            s3_path: chunk.s3_path.clone(),
120            object_size: chunk.object_size,
121        }
122    }
123}
124
125pub type ExternalPayloadRef = ExternalPayloadRefV1;
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128pub struct HotPayloadSegment {
129    pub start_offset: u64,
130    pub end_offset: u64,
131    pub payload_start: usize,
132    pub payload_end: usize,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct ColdFlushCandidate {
137    pub stream_id: BucketStreamId,
138    pub start_offset: u64,
139    pub end_offset: u64,
140    pub payload: Vec<u8>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct StreamReadColdSegment {
145    pub chunk: ColdChunkRef,
146    pub read_start_offset: u64,
147    pub len: usize,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct StreamReadObjectSegment {
152    pub object: ObjectPayloadRef,
153    pub read_start_offset: u64,
154    pub len: usize,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq)]
158pub enum StreamReadSegment {
159    Object(StreamReadObjectSegment),
160    Hot(Vec<u8>),
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct StreamReadPlan {
165    pub offset: u64,
166    pub next_offset: u64,
167    pub content_type: String,
168    pub segments: Vec<StreamReadSegment>,
169    pub up_to_date: bool,
170    pub closed: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174pub struct StreamMessageRecord {
175    pub start_offset: u64,
176    pub end_offset: u64,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
180pub struct StreamVisibleSnapshot {
181    pub offset: u64,
182    pub content_type: String,
183    pub payload: Vec<u8>,
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub struct StreamBootstrapPlan {
188    pub snapshot: Option<StreamVisibleSnapshot>,
189    pub updates: Vec<StreamMessageRecord>,
190    pub next_offset: u64,
191    pub content_type: String,
192    pub up_to_date: bool,
193    pub closed: bool,
194}