Skip to main content

ursula_runtime/
request.rs

1#[cfg(test)]
2use std::sync::Arc;
3
4use bytes::Bytes;
5use serde::{Deserialize, Serialize};
6use ursula_shard::{BucketStreamId, ShardPlacement};
7use ursula_stream::{
8    ColdChunkRef, ExternalPayloadRef, ProducerRequest, StreamReadPlan, StreamReadSegment,
9};
10
11use crate::cold_store::{ColdStoreHandle, DEFAULT_CONTENT_TYPE};
12use crate::engine::GroupEngineError;
13use crate::engine_in_memory::InMemoryGroupEngine;
14use crate::error::RuntimeError;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct CreateStreamRequest {
18    pub stream_id: BucketStreamId,
19    pub content_type: String,
20    pub content_type_explicit: bool,
21    pub initial_payload: Bytes,
22    pub close_after: bool,
23    pub stream_seq: Option<String>,
24    pub producer: Option<ProducerRequest>,
25    pub stream_ttl_seconds: Option<u64>,
26    pub stream_expires_at_ms: Option<u64>,
27    pub forked_from: Option<BucketStreamId>,
28    pub fork_offset: Option<u64>,
29    pub now_ms: u64,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CreateStreamExternalRequest {
34    pub stream_id: BucketStreamId,
35    pub content_type: String,
36    pub initial_payload: ExternalPayloadRef,
37    pub close_after: bool,
38    pub stream_seq: Option<String>,
39    pub producer: Option<ProducerRequest>,
40    pub stream_ttl_seconds: Option<u64>,
41    pub stream_expires_at_ms: Option<u64>,
42    pub forked_from: Option<BucketStreamId>,
43    pub fork_offset: Option<u64>,
44    pub now_ms: u64,
45}
46
47impl CreateStreamExternalRequest {
48    pub fn from_create_request(
49        request: CreateStreamRequest,
50        initial_payload: ExternalPayloadRef,
51    ) -> Self {
52        Self {
53            stream_id: request.stream_id,
54            content_type: request.content_type,
55            initial_payload,
56            close_after: request.close_after,
57            stream_seq: request.stream_seq,
58            producer: request.producer,
59            stream_ttl_seconds: request.stream_ttl_seconds,
60            stream_expires_at_ms: request.stream_expires_at_ms,
61            forked_from: request.forked_from,
62            fork_offset: request.fork_offset,
63            now_ms: request.now_ms,
64        }
65    }
66}
67
68impl CreateStreamRequest {
69    pub fn new(stream_id: BucketStreamId, content_type: impl Into<String>) -> Self {
70        Self {
71            stream_id,
72            content_type: content_type.into(),
73            content_type_explicit: true,
74            initial_payload: Bytes::new(),
75            close_after: false,
76            stream_seq: None,
77            producer: None,
78            stream_ttl_seconds: None,
79            stream_expires_at_ms: None,
80            forked_from: None,
81            fork_offset: None,
82            now_ms: 0,
83        }
84    }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct CreateStreamResponse {
89    pub placement: ShardPlacement,
90    pub next_offset: u64,
91    pub closed: bool,
92    pub already_exists: bool,
93    pub group_commit_index: u64,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct HeadStreamRequest {
98    pub stream_id: BucketStreamId,
99    pub now_ms: u64,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct HeadStreamResponse {
104    pub placement: ShardPlacement,
105    pub content_type: String,
106    pub tail_offset: u64,
107    pub closed: bool,
108    pub stream_ttl_seconds: Option<u64>,
109    pub stream_expires_at_ms: Option<u64>,
110    pub snapshot_offset: Option<u64>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ReadStreamRequest {
115    pub stream_id: BucketStreamId,
116    pub offset: u64,
117    pub max_len: usize,
118    pub now_ms: u64,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct ReadStreamResponse {
123    pub placement: ShardPlacement,
124    pub offset: u64,
125    pub next_offset: u64,
126    pub content_type: String,
127    pub payload: Vec<u8>,
128    pub up_to_date: bool,
129    pub closed: bool,
130}
131
132pub enum GroupReadStreamBody {
133    Materialized(Vec<u8>),
134    Planned {
135        stream_id: BucketStreamId,
136        plan: StreamReadPlan,
137        cold_store: Option<ColdStoreHandle>,
138    },
139    #[cfg(test)]
140    Blocking {
141        entered: Arc<tokio::sync::Notify>,
142        release: Arc<tokio::sync::Notify>,
143        payload: Vec<u8>,
144    },
145}
146
147pub struct GroupReadStreamParts {
148    pub placement: ShardPlacement,
149    pub offset: u64,
150    pub next_offset: u64,
151    pub content_type: String,
152    pub up_to_date: bool,
153    pub closed: bool,
154    pub body: GroupReadStreamBody,
155}
156
157impl GroupReadStreamParts {
158    pub fn from_response(response: ReadStreamResponse) -> Self {
159        Self {
160            placement: response.placement,
161            offset: response.offset,
162            next_offset: response.next_offset,
163            content_type: response.content_type,
164            up_to_date: response.up_to_date,
165            closed: response.closed,
166            body: GroupReadStreamBody::Materialized(response.payload),
167        }
168    }
169
170    pub fn from_plan(
171        placement: ShardPlacement,
172        stream_id: BucketStreamId,
173        plan: StreamReadPlan,
174        cold_store: Option<ColdStoreHandle>,
175    ) -> Self {
176        Self {
177            placement,
178            offset: plan.offset,
179            next_offset: plan.next_offset,
180            content_type: plan.content_type.clone(),
181            up_to_date: plan.up_to_date,
182            closed: plan.closed,
183            body: GroupReadStreamBody::Planned {
184                stream_id,
185                plan,
186                cold_store,
187            },
188        }
189    }
190
191    pub async fn into_response(self) -> Result<ReadStreamResponse, GroupEngineError> {
192        let payload = match &self.body {
193            GroupReadStreamBody::Materialized(payload) => payload.clone(),
194            GroupReadStreamBody::Planned {
195                stream_id,
196                plan,
197                cold_store,
198            } => {
199                InMemoryGroupEngine::read_payload_from_plan(cold_store.as_ref(), stream_id, plan)
200                    .await?
201            }
202            #[cfg(test)]
203            GroupReadStreamBody::Blocking {
204                entered,
205                release,
206                payload,
207            } => {
208                entered.notify_one();
209                release.notified().await;
210                payload.clone()
211            }
212        };
213        Ok(ReadStreamResponse {
214            placement: self.placement,
215            offset: self.offset,
216            next_offset: self.next_offset,
217            content_type: self.content_type,
218            payload,
219            up_to_date: self.up_to_date,
220            closed: self.closed,
221        })
222    }
223
224    pub fn payload_is_empty(&self) -> bool {
225        match &self.body {
226            GroupReadStreamBody::Materialized(payload) => payload.is_empty(),
227            GroupReadStreamBody::Planned { plan, .. } => {
228                plan.segments.iter().all(|segment| match segment {
229                    StreamReadSegment::Hot(payload) => payload.is_empty(),
230                    StreamReadSegment::Object(segment) => segment.len == 0,
231                })
232            }
233            #[cfg(test)]
234            GroupReadStreamBody::Blocking { payload, .. } => payload.is_empty(),
235        }
236    }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct PublishSnapshotRequest {
241    pub stream_id: BucketStreamId,
242    pub snapshot_offset: u64,
243    pub content_type: String,
244    pub payload: Bytes,
245    pub now_ms: u64,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
249pub struct PublishSnapshotResponse {
250    pub placement: ShardPlacement,
251    pub snapshot_offset: u64,
252    pub group_commit_index: u64,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct ReadSnapshotRequest {
257    pub stream_id: BucketStreamId,
258    pub snapshot_offset: Option<u64>,
259    pub now_ms: u64,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct ReadSnapshotResponse {
264    pub placement: ShardPlacement,
265    pub snapshot_offset: u64,
266    pub next_offset: u64,
267    pub content_type: String,
268    pub payload: Vec<u8>,
269    pub up_to_date: bool,
270}
271
272#[derive(Debug, Clone, PartialEq, Eq)]
273pub struct DeleteSnapshotRequest {
274    pub stream_id: BucketStreamId,
275    pub snapshot_offset: u64,
276    pub now_ms: u64,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct BootstrapStreamRequest {
281    pub stream_id: BucketStreamId,
282    pub now_ms: u64,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct BootstrapUpdate {
287    pub start_offset: u64,
288    pub next_offset: u64,
289    pub content_type: String,
290    pub payload: Vec<u8>,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct BootstrapStreamResponse {
295    pub placement: ShardPlacement,
296    pub snapshot_offset: Option<u64>,
297    pub snapshot_content_type: String,
298    pub snapshot_payload: Vec<u8>,
299    pub updates: Vec<BootstrapUpdate>,
300    pub next_offset: u64,
301    pub up_to_date: bool,
302    pub closed: bool,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct CloseStreamRequest {
307    pub stream_id: BucketStreamId,
308    pub stream_seq: Option<String>,
309    pub producer: Option<ProducerRequest>,
310    pub now_ms: u64,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
314pub struct CloseStreamResponse {
315    pub placement: ShardPlacement,
316    pub next_offset: u64,
317    pub group_commit_index: u64,
318    pub deduplicated: bool,
319}
320
321#[derive(Debug, Clone, PartialEq, Eq)]
322pub struct DeleteStreamRequest {
323    pub stream_id: BucketStreamId,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
327pub struct DeleteStreamResponse {
328    pub placement: ShardPlacement,
329    pub group_commit_index: u64,
330    pub hard_deleted: bool,
331    pub parent_to_release: Option<BucketStreamId>,
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335pub struct ForkRefResponse {
336    pub placement: ShardPlacement,
337    pub fork_ref_count: u64,
338    pub hard_deleted: bool,
339    pub parent_to_release: Option<BucketStreamId>,
340    pub group_commit_index: u64,
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct FlushColdRequest {
345    pub stream_id: BucketStreamId,
346    pub chunk: ColdChunkRef,
347}
348
349#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct FlushColdResponse {
351    pub placement: ShardPlacement,
352    pub hot_start_offset: u64,
353    pub group_commit_index: u64,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
357pub struct TouchStreamAccessResponse {
358    pub placement: ShardPlacement,
359    pub changed: bool,
360    pub expired: bool,
361    pub group_commit_index: u64,
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct PlanColdFlushRequest {
366    pub stream_id: BucketStreamId,
367    pub min_hot_bytes: usize,
368    pub max_flush_bytes: usize,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct PlanGroupColdFlushRequest {
373    pub min_hot_bytes: usize,
374    pub max_flush_bytes: usize,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct ColdHotBacklog {
379    pub stream_id: BucketStreamId,
380    pub stream_hot_bytes: u64,
381    pub group_hot_bytes: u64,
382}
383
384#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
385pub struct ColdWriteAdmission {
386    pub max_hot_bytes_per_group: Option<u64>,
387}
388
389impl ColdWriteAdmission {
390    pub(crate) fn is_enabled(self) -> bool {
391        self.max_hot_bytes_per_group.is_some()
392    }
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct AppendRequest {
397    pub stream_id: BucketStreamId,
398    pub content_type: String,
399    pub payload: Bytes,
400    pub close_after: bool,
401    pub stream_seq: Option<String>,
402    pub producer: Option<ProducerRequest>,
403    pub now_ms: u64,
404}
405
406#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
407pub struct AppendExternalRequest {
408    pub stream_id: BucketStreamId,
409    pub content_type: String,
410    pub payload: ExternalPayloadRef,
411    pub close_after: bool,
412    pub stream_seq: Option<String>,
413    pub producer: Option<ProducerRequest>,
414    pub now_ms: u64,
415}
416
417impl AppendExternalRequest {
418    pub fn from_append_request(request: AppendRequest, payload: ExternalPayloadRef) -> Self {
419        Self {
420            stream_id: request.stream_id,
421            content_type: request.content_type,
422            payload,
423            close_after: request.close_after,
424            stream_seq: request.stream_seq,
425            producer: request.producer,
426            now_ms: request.now_ms,
427        }
428    }
429}
430
431impl AppendRequest {
432    pub fn new(stream_id: BucketStreamId, payload_len: u64) -> Self {
433        Self {
434            stream_id,
435            content_type: DEFAULT_CONTENT_TYPE.to_owned(),
436            payload: Bytes::from(vec![
437                0;
438                usize::try_from(payload_len)
439                    .expect("payload_len fits usize")
440            ]),
441            close_after: false,
442            stream_seq: None,
443            producer: None,
444            now_ms: 0,
445        }
446    }
447
448    pub fn from_bytes(stream_id: BucketStreamId, payload: impl Into<Bytes>) -> Self {
449        Self {
450            stream_id,
451            content_type: DEFAULT_CONTENT_TYPE.to_owned(),
452            payload: payload.into(),
453            close_after: false,
454            stream_seq: None,
455            producer: None,
456            now_ms: 0,
457        }
458    }
459
460    pub fn payload_len(&self) -> u64 {
461        u64::try_from(self.payload.len()).expect("payload len fits u64")
462    }
463}
464
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct AppendBatchRequest {
467    pub stream_id: BucketStreamId,
468    pub content_type: String,
469    pub payloads: Vec<Bytes>,
470    pub producer: Option<ProducerRequest>,
471    pub now_ms: u64,
472}
473
474impl AppendBatchRequest {
475    pub fn new<P>(stream_id: BucketStreamId, payloads: Vec<P>) -> Self
476    where
477        P: Into<Bytes>,
478    {
479        Self {
480            stream_id,
481            content_type: DEFAULT_CONTENT_TYPE.to_owned(),
482            payloads: payloads.into_iter().map(Into::into).collect(),
483            producer: None,
484            now_ms: 0,
485        }
486    }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490pub struct AppendResponse {
491    pub placement: ShardPlacement,
492    pub start_offset: u64,
493    pub next_offset: u64,
494    pub stream_append_count: u64,
495    pub group_commit_index: u64,
496    pub closed: bool,
497    pub deduplicated: bool,
498    pub producer: Option<ProducerRequest>,
499}
500
501#[derive(Debug, Clone, PartialEq, Eq)]
502pub struct AppendBatchResponse {
503    pub placement: ShardPlacement,
504    pub items: Vec<Result<AppendResponse, RuntimeError>>,
505}
506
507#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
508pub struct StreamAppendCount {
509    pub stream_id: BucketStreamId,
510    pub append_count: u64,
511}