1#[cfg(test)]
2use std::sync::Arc;
3
4use bytes::Bytes;
5use serde::{Deserialize, Serialize};
6use ursula_shard::{BucketStreamId, ShardPlacement};
7use ursula_stream::{
8 ColdChunkRef, ExternalPayloadRef, ProducerRequest, StreamReadPlan, StreamReadSegment,
9};
10
11use crate::cold_store::{ColdStoreHandle, DEFAULT_CONTENT_TYPE};
12use crate::engine::GroupEngineError;
13use crate::engine_in_memory::InMemoryGroupEngine;
14use crate::error::RuntimeError;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct CreateStreamRequest {
18 pub stream_id: BucketStreamId,
19 pub content_type: String,
20 pub content_type_explicit: bool,
21 pub initial_payload: Bytes,
22 pub close_after: bool,
23 pub stream_seq: Option<String>,
24 pub producer: Option<ProducerRequest>,
25 pub stream_ttl_seconds: Option<u64>,
26 pub stream_expires_at_ms: Option<u64>,
27 pub forked_from: Option<BucketStreamId>,
28 pub fork_offset: Option<u64>,
29 pub now_ms: u64,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CreateStreamExternalRequest {
34 pub stream_id: BucketStreamId,
35 pub content_type: String,
36 pub initial_payload: ExternalPayloadRef,
37 pub close_after: bool,
38 pub stream_seq: Option<String>,
39 pub producer: Option<ProducerRequest>,
40 pub stream_ttl_seconds: Option<u64>,
41 pub stream_expires_at_ms: Option<u64>,
42 pub forked_from: Option<BucketStreamId>,
43 pub fork_offset: Option<u64>,
44 pub now_ms: u64,
45}
46
47impl CreateStreamExternalRequest {
48 pub fn from_create_request(
49 request: CreateStreamRequest,
50 initial_payload: ExternalPayloadRef,
51 ) -> Self {
52 Self {
53 stream_id: request.stream_id,
54 content_type: request.content_type,
55 initial_payload,
56 close_after: request.close_after,
57 stream_seq: request.stream_seq,
58 producer: request.producer,
59 stream_ttl_seconds: request.stream_ttl_seconds,
60 stream_expires_at_ms: request.stream_expires_at_ms,
61 forked_from: request.forked_from,
62 fork_offset: request.fork_offset,
63 now_ms: request.now_ms,
64 }
65 }
66}
67
68impl CreateStreamRequest {
69 pub fn new(stream_id: BucketStreamId, content_type: impl Into<String>) -> Self {
70 Self {
71 stream_id,
72 content_type: content_type.into(),
73 content_type_explicit: true,
74 initial_payload: Bytes::new(),
75 close_after: false,
76 stream_seq: None,
77 producer: None,
78 stream_ttl_seconds: None,
79 stream_expires_at_ms: None,
80 forked_from: None,
81 fork_offset: None,
82 now_ms: 0,
83 }
84 }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct CreateStreamResponse {
89 pub placement: ShardPlacement,
90 pub next_offset: u64,
91 pub closed: bool,
92 pub already_exists: bool,
93 pub group_commit_index: u64,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct HeadStreamRequest {
98 pub stream_id: BucketStreamId,
99 pub now_ms: u64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct HeadStreamResponse {
104 pub placement: ShardPlacement,
105 pub content_type: String,
106 pub tail_offset: u64,
107 pub closed: bool,
108 pub stream_ttl_seconds: Option<u64>,
109 pub stream_expires_at_ms: Option<u64>,
110 pub snapshot_offset: Option<u64>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ReadStreamRequest {
115 pub stream_id: BucketStreamId,
116 pub offset: u64,
117 pub max_len: usize,
118 pub now_ms: u64,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct ReadStreamResponse {
123 pub placement: ShardPlacement,
124 pub offset: u64,
125 pub next_offset: u64,
126 pub content_type: String,
127 pub payload: Vec<u8>,
128 pub up_to_date: bool,
129 pub closed: bool,
130}
131
132pub enum GroupReadStreamBody {
133 Materialized(Vec<u8>),
134 Planned {
135 stream_id: BucketStreamId,
136 plan: StreamReadPlan,
137 cold_store: Option<ColdStoreHandle>,
138 },
139 #[cfg(test)]
140 Blocking {
141 entered: Arc<tokio::sync::Notify>,
142 release: Arc<tokio::sync::Notify>,
143 payload: Vec<u8>,
144 },
145}
146
147pub struct GroupReadStreamParts {
148 pub placement: ShardPlacement,
149 pub offset: u64,
150 pub next_offset: u64,
151 pub content_type: String,
152 pub up_to_date: bool,
153 pub closed: bool,
154 pub body: GroupReadStreamBody,
155}
156
157impl GroupReadStreamParts {
158 pub fn from_response(response: ReadStreamResponse) -> Self {
159 Self {
160 placement: response.placement,
161 offset: response.offset,
162 next_offset: response.next_offset,
163 content_type: response.content_type,
164 up_to_date: response.up_to_date,
165 closed: response.closed,
166 body: GroupReadStreamBody::Materialized(response.payload),
167 }
168 }
169
170 pub fn from_plan(
171 placement: ShardPlacement,
172 stream_id: BucketStreamId,
173 plan: StreamReadPlan,
174 cold_store: Option<ColdStoreHandle>,
175 ) -> Self {
176 Self {
177 placement,
178 offset: plan.offset,
179 next_offset: plan.next_offset,
180 content_type: plan.content_type.clone(),
181 up_to_date: plan.up_to_date,
182 closed: plan.closed,
183 body: GroupReadStreamBody::Planned {
184 stream_id,
185 plan,
186 cold_store,
187 },
188 }
189 }
190
191 pub async fn into_response(self) -> Result<ReadStreamResponse, GroupEngineError> {
192 let payload = match &self.body {
193 GroupReadStreamBody::Materialized(payload) => payload.clone(),
194 GroupReadStreamBody::Planned {
195 stream_id,
196 plan,
197 cold_store,
198 } => {
199 InMemoryGroupEngine::read_payload_from_plan(cold_store.as_ref(), stream_id, plan)
200 .await?
201 }
202 #[cfg(test)]
203 GroupReadStreamBody::Blocking {
204 entered,
205 release,
206 payload,
207 } => {
208 entered.notify_one();
209 release.notified().await;
210 payload.clone()
211 }
212 };
213 Ok(ReadStreamResponse {
214 placement: self.placement,
215 offset: self.offset,
216 next_offset: self.next_offset,
217 content_type: self.content_type,
218 payload,
219 up_to_date: self.up_to_date,
220 closed: self.closed,
221 })
222 }
223
224 pub fn payload_is_empty(&self) -> bool {
225 match &self.body {
226 GroupReadStreamBody::Materialized(payload) => payload.is_empty(),
227 GroupReadStreamBody::Planned { plan, .. } => {
228 plan.segments.iter().all(|segment| match segment {
229 StreamReadSegment::Hot(payload) => payload.is_empty(),
230 StreamReadSegment::Object(segment) => segment.len == 0,
231 })
232 }
233 #[cfg(test)]
234 GroupReadStreamBody::Blocking { payload, .. } => payload.is_empty(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct PublishSnapshotRequest {
241 pub stream_id: BucketStreamId,
242 pub snapshot_offset: u64,
243 pub content_type: String,
244 pub payload: Bytes,
245 pub now_ms: u64,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249pub struct PublishSnapshotResponse {
250 pub placement: ShardPlacement,
251 pub snapshot_offset: u64,
252 pub group_commit_index: u64,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct ReadSnapshotRequest {
257 pub stream_id: BucketStreamId,
258 pub snapshot_offset: Option<u64>,
259 pub now_ms: u64,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct ReadSnapshotResponse {
264 pub placement: ShardPlacement,
265 pub snapshot_offset: u64,
266 pub next_offset: u64,
267 pub content_type: String,
268 pub payload: Vec<u8>,
269 pub up_to_date: bool,
270}
271
272#[derive(Debug, Clone, PartialEq, Eq)]
273pub struct DeleteSnapshotRequest {
274 pub stream_id: BucketStreamId,
275 pub snapshot_offset: u64,
276 pub now_ms: u64,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct BootstrapStreamRequest {
281 pub stream_id: BucketStreamId,
282 pub now_ms: u64,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct BootstrapUpdate {
287 pub start_offset: u64,
288 pub next_offset: u64,
289 pub content_type: String,
290 pub payload: Vec<u8>,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct BootstrapStreamResponse {
295 pub placement: ShardPlacement,
296 pub snapshot_offset: Option<u64>,
297 pub snapshot_content_type: String,
298 pub snapshot_payload: Vec<u8>,
299 pub updates: Vec<BootstrapUpdate>,
300 pub next_offset: u64,
301 pub up_to_date: bool,
302 pub closed: bool,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct CloseStreamRequest {
307 pub stream_id: BucketStreamId,
308 pub stream_seq: Option<String>,
309 pub producer: Option<ProducerRequest>,
310 pub now_ms: u64,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
314pub struct CloseStreamResponse {
315 pub placement: ShardPlacement,
316 pub next_offset: u64,
317 pub group_commit_index: u64,
318 pub deduplicated: bool,
319}
320
321#[derive(Debug, Clone, PartialEq, Eq)]
322pub struct DeleteStreamRequest {
323 pub stream_id: BucketStreamId,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
327pub struct DeleteStreamResponse {
328 pub placement: ShardPlacement,
329 pub group_commit_index: u64,
330 pub hard_deleted: bool,
331 pub parent_to_release: Option<BucketStreamId>,
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335pub struct ForkRefResponse {
336 pub placement: ShardPlacement,
337 pub fork_ref_count: u64,
338 pub hard_deleted: bool,
339 pub parent_to_release: Option<BucketStreamId>,
340 pub group_commit_index: u64,
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct FlushColdRequest {
345 pub stream_id: BucketStreamId,
346 pub chunk: ColdChunkRef,
347}
348
349#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct FlushColdResponse {
351 pub placement: ShardPlacement,
352 pub hot_start_offset: u64,
353 pub group_commit_index: u64,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
357pub struct TouchStreamAccessResponse {
358 pub placement: ShardPlacement,
359 pub changed: bool,
360 pub expired: bool,
361 pub group_commit_index: u64,
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct PlanColdFlushRequest {
366 pub stream_id: BucketStreamId,
367 pub min_hot_bytes: usize,
368 pub max_flush_bytes: usize,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct PlanGroupColdFlushRequest {
373 pub min_hot_bytes: usize,
374 pub max_flush_bytes: usize,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct ColdHotBacklog {
379 pub stream_id: BucketStreamId,
380 pub stream_hot_bytes: u64,
381 pub group_hot_bytes: u64,
382}
383
384#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
385pub struct ColdWriteAdmission {
386 pub max_hot_bytes_per_group: Option<u64>,
387}
388
389impl ColdWriteAdmission {
390 pub(crate) fn is_enabled(self) -> bool {
391 self.max_hot_bytes_per_group.is_some()
392 }
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct AppendRequest {
397 pub stream_id: BucketStreamId,
398 pub content_type: String,
399 pub payload: Bytes,
400 pub close_after: bool,
401 pub stream_seq: Option<String>,
402 pub producer: Option<ProducerRequest>,
403 pub now_ms: u64,
404}
405
406#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
407pub struct AppendExternalRequest {
408 pub stream_id: BucketStreamId,
409 pub content_type: String,
410 pub payload: ExternalPayloadRef,
411 pub close_after: bool,
412 pub stream_seq: Option<String>,
413 pub producer: Option<ProducerRequest>,
414 pub now_ms: u64,
415}
416
417impl AppendExternalRequest {
418 pub fn from_append_request(request: AppendRequest, payload: ExternalPayloadRef) -> Self {
419 Self {
420 stream_id: request.stream_id,
421 content_type: request.content_type,
422 payload,
423 close_after: request.close_after,
424 stream_seq: request.stream_seq,
425 producer: request.producer,
426 now_ms: request.now_ms,
427 }
428 }
429}
430
431impl AppendRequest {
432 pub fn new(stream_id: BucketStreamId, payload_len: u64) -> Self {
433 Self {
434 stream_id,
435 content_type: DEFAULT_CONTENT_TYPE.to_owned(),
436 payload: Bytes::from(vec![
437 0;
438 usize::try_from(payload_len)
439 .expect("payload_len fits usize")
440 ]),
441 close_after: false,
442 stream_seq: None,
443 producer: None,
444 now_ms: 0,
445 }
446 }
447
448 pub fn from_bytes(stream_id: BucketStreamId, payload: impl Into<Bytes>) -> Self {
449 Self {
450 stream_id,
451 content_type: DEFAULT_CONTENT_TYPE.to_owned(),
452 payload: payload.into(),
453 close_after: false,
454 stream_seq: None,
455 producer: None,
456 now_ms: 0,
457 }
458 }
459
460 pub fn payload_len(&self) -> u64 {
461 u64::try_from(self.payload.len()).expect("payload len fits u64")
462 }
463}
464
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct AppendBatchRequest {
467 pub stream_id: BucketStreamId,
468 pub content_type: String,
469 pub payloads: Vec<Bytes>,
470 pub producer: Option<ProducerRequest>,
471 pub now_ms: u64,
472}
473
474impl AppendBatchRequest {
475 pub fn new<P>(stream_id: BucketStreamId, payloads: Vec<P>) -> Self
476 where
477 P: Into<Bytes>,
478 {
479 Self {
480 stream_id,
481 content_type: DEFAULT_CONTENT_TYPE.to_owned(),
482 payloads: payloads.into_iter().map(Into::into).collect(),
483 producer: None,
484 now_ms: 0,
485 }
486 }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct AppendResponse {
491 pub placement: ShardPlacement,
492 pub start_offset: u64,
493 pub next_offset: u64,
494 pub stream_append_count: u64,
495 pub group_commit_index: u64,
496 pub closed: bool,
497 pub deduplicated: bool,
498 pub producer: Option<ProducerRequest>,
499}
500
501#[derive(Debug, Clone, PartialEq, Eq)]
502pub struct AppendBatchResponse {
503 pub placement: ShardPlacement,
504 pub items: Vec<Result<AppendResponse, RuntimeError>>,
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
508pub struct StreamAppendCount {
509 pub stream_id: BucketStreamId,
510 pub append_count: u64,
511}