Skip to main content

ursula_runtime/
command.rs

1use std::fmt;
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use ursula_shard::{BucketStreamId, ShardPlacement};
6use ursula_stream::{ColdChunkRef, ExternalPayloadRef, ProducerRequest, StreamSnapshot};
7
8use crate::request::{
9    AppendBatchRequest, AppendExternalRequest, AppendRequest, CloseStreamRequest,
10    CreateStreamExternalRequest, CreateStreamRequest, DeleteStreamRequest, FlushColdRequest,
11    PublishSnapshotRequest, StreamAppendCount,
12};
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct GroupSnapshot {
16    pub placement: ShardPlacement,
17    pub group_commit_index: u64,
18    pub stream_snapshot: StreamSnapshot,
19    pub stream_append_counts: Vec<StreamAppendCount>,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(tag = "command", rename_all = "snake_case")]
24pub enum GroupWriteCommand {
25    CreateStream {
26        stream_id: BucketStreamId,
27        content_type: String,
28        initial_payload: Bytes,
29        close_after: bool,
30        stream_seq: Option<String>,
31        producer: Option<ProducerRequest>,
32        stream_ttl_seconds: Option<u64>,
33        stream_expires_at_ms: Option<u64>,
34        forked_from: Option<BucketStreamId>,
35        fork_offset: Option<u64>,
36        now_ms: u64,
37    },
38    CreateExternal {
39        stream_id: BucketStreamId,
40        content_type: String,
41        initial_payload: ExternalPayloadRef,
42        close_after: bool,
43        stream_seq: Option<String>,
44        producer: Option<ProducerRequest>,
45        stream_ttl_seconds: Option<u64>,
46        stream_expires_at_ms: Option<u64>,
47        forked_from: Option<BucketStreamId>,
48        fork_offset: Option<u64>,
49        now_ms: u64,
50    },
51    Append {
52        stream_id: BucketStreamId,
53        content_type: String,
54        payload: Bytes,
55        close_after: bool,
56        stream_seq: Option<String>,
57        producer: Option<ProducerRequest>,
58        now_ms: u64,
59    },
60    AppendExternal {
61        stream_id: BucketStreamId,
62        content_type: String,
63        payload: ExternalPayloadRef,
64        close_after: bool,
65        stream_seq: Option<String>,
66        producer: Option<ProducerRequest>,
67        now_ms: u64,
68    },
69    AppendBatch {
70        stream_id: BucketStreamId,
71        content_type: String,
72        payloads: Vec<Bytes>,
73        producer: Option<ProducerRequest>,
74        now_ms: u64,
75    },
76    PublishSnapshot {
77        stream_id: BucketStreamId,
78        snapshot_offset: u64,
79        content_type: String,
80        payload: Bytes,
81        now_ms: u64,
82    },
83    TouchStreamAccess {
84        stream_id: BucketStreamId,
85        now_ms: u64,
86        renew_ttl: bool,
87    },
88    AddForkRef {
89        stream_id: BucketStreamId,
90        now_ms: u64,
91    },
92    ReleaseForkRef {
93        stream_id: BucketStreamId,
94    },
95    FlushCold {
96        stream_id: BucketStreamId,
97        chunk: ColdChunkRef,
98    },
99    CloseStream {
100        stream_id: BucketStreamId,
101        stream_seq: Option<String>,
102        producer: Option<ProducerRequest>,
103        now_ms: u64,
104    },
105    DeleteStream {
106        stream_id: BucketStreamId,
107    },
108    Batch {
109        commands: Vec<GroupWriteCommand>,
110    },
111}
112
113impl From<CreateStreamRequest> for GroupWriteCommand {
114    fn from(request: CreateStreamRequest) -> Self {
115        Self::CreateStream {
116            stream_id: request.stream_id,
117            content_type: request.content_type,
118            initial_payload: request.initial_payload,
119            close_after: request.close_after,
120            stream_seq: request.stream_seq,
121            producer: request.producer,
122            stream_ttl_seconds: request.stream_ttl_seconds,
123            stream_expires_at_ms: request.stream_expires_at_ms,
124            forked_from: request.forked_from,
125            fork_offset: request.fork_offset,
126            now_ms: request.now_ms,
127        }
128    }
129}
130
131impl From<&CreateStreamRequest> for GroupWriteCommand {
132    fn from(request: &CreateStreamRequest) -> Self {
133        Self::CreateStream {
134            stream_id: request.stream_id.clone(),
135            content_type: request.content_type.clone(),
136            initial_payload: request.initial_payload.clone(),
137            close_after: request.close_after,
138            stream_seq: request.stream_seq.clone(),
139            producer: request.producer.clone(),
140            stream_ttl_seconds: request.stream_ttl_seconds,
141            stream_expires_at_ms: request.stream_expires_at_ms,
142            forked_from: request.forked_from.clone(),
143            fork_offset: request.fork_offset,
144            now_ms: request.now_ms,
145        }
146    }
147}
148
149impl From<CreateStreamExternalRequest> for GroupWriteCommand {
150    fn from(request: CreateStreamExternalRequest) -> Self {
151        Self::CreateExternal {
152            stream_id: request.stream_id,
153            content_type: request.content_type,
154            initial_payload: request.initial_payload,
155            close_after: request.close_after,
156            stream_seq: request.stream_seq,
157            producer: request.producer,
158            stream_ttl_seconds: request.stream_ttl_seconds,
159            stream_expires_at_ms: request.stream_expires_at_ms,
160            forked_from: request.forked_from,
161            fork_offset: request.fork_offset,
162            now_ms: request.now_ms,
163        }
164    }
165}
166
167impl From<&CreateStreamExternalRequest> for GroupWriteCommand {
168    fn from(request: &CreateStreamExternalRequest) -> Self {
169        Self::CreateExternal {
170            stream_id: request.stream_id.clone(),
171            content_type: request.content_type.clone(),
172            initial_payload: request.initial_payload.clone(),
173            close_after: request.close_after,
174            stream_seq: request.stream_seq.clone(),
175            producer: request.producer.clone(),
176            stream_ttl_seconds: request.stream_ttl_seconds,
177            stream_expires_at_ms: request.stream_expires_at_ms,
178            forked_from: request.forked_from.clone(),
179            fork_offset: request.fork_offset,
180            now_ms: request.now_ms,
181        }
182    }
183}
184
185impl From<AppendRequest> for GroupWriteCommand {
186    fn from(request: AppendRequest) -> Self {
187        Self::Append {
188            stream_id: request.stream_id,
189            content_type: request.content_type,
190            payload: request.payload,
191            close_after: request.close_after,
192            stream_seq: request.stream_seq,
193            producer: request.producer,
194            now_ms: request.now_ms,
195        }
196    }
197}
198
199impl From<&AppendRequest> for GroupWriteCommand {
200    fn from(request: &AppendRequest) -> Self {
201        Self::Append {
202            stream_id: request.stream_id.clone(),
203            content_type: request.content_type.clone(),
204            payload: request.payload.clone(),
205            close_after: request.close_after,
206            stream_seq: request.stream_seq.clone(),
207            producer: request.producer.clone(),
208            now_ms: request.now_ms,
209        }
210    }
211}
212
213impl From<AppendExternalRequest> for GroupWriteCommand {
214    fn from(request: AppendExternalRequest) -> Self {
215        Self::AppendExternal {
216            stream_id: request.stream_id,
217            content_type: request.content_type,
218            payload: request.payload,
219            close_after: request.close_after,
220            stream_seq: request.stream_seq,
221            producer: request.producer,
222            now_ms: request.now_ms,
223        }
224    }
225}
226
227impl From<&AppendExternalRequest> for GroupWriteCommand {
228    fn from(request: &AppendExternalRequest) -> Self {
229        Self::AppendExternal {
230            stream_id: request.stream_id.clone(),
231            content_type: request.content_type.clone(),
232            payload: request.payload.clone(),
233            close_after: request.close_after,
234            stream_seq: request.stream_seq.clone(),
235            producer: request.producer.clone(),
236            now_ms: request.now_ms,
237        }
238    }
239}
240
241impl From<AppendBatchRequest> for GroupWriteCommand {
242    fn from(request: AppendBatchRequest) -> Self {
243        Self::AppendBatch {
244            stream_id: request.stream_id,
245            content_type: request.content_type,
246            payloads: request.payloads,
247            producer: request.producer,
248            now_ms: request.now_ms,
249        }
250    }
251}
252
253impl From<&AppendBatchRequest> for GroupWriteCommand {
254    fn from(request: &AppendBatchRequest) -> Self {
255        Self::AppendBatch {
256            stream_id: request.stream_id.clone(),
257            content_type: request.content_type.clone(),
258            payloads: request.payloads.clone(),
259            producer: request.producer.clone(),
260            now_ms: request.now_ms,
261        }
262    }
263}
264
265impl From<PublishSnapshotRequest> for GroupWriteCommand {
266    fn from(request: PublishSnapshotRequest) -> Self {
267        Self::PublishSnapshot {
268            stream_id: request.stream_id,
269            snapshot_offset: request.snapshot_offset,
270            content_type: request.content_type,
271            payload: request.payload,
272            now_ms: request.now_ms,
273        }
274    }
275}
276
277impl From<&PublishSnapshotRequest> for GroupWriteCommand {
278    fn from(request: &PublishSnapshotRequest) -> Self {
279        Self::PublishSnapshot {
280            stream_id: request.stream_id.clone(),
281            snapshot_offset: request.snapshot_offset,
282            content_type: request.content_type.clone(),
283            payload: request.payload.clone(),
284            now_ms: request.now_ms,
285        }
286    }
287}
288
289impl From<CloseStreamRequest> for GroupWriteCommand {
290    fn from(request: CloseStreamRequest) -> Self {
291        Self::CloseStream {
292            stream_id: request.stream_id,
293            stream_seq: request.stream_seq,
294            producer: request.producer,
295            now_ms: request.now_ms,
296        }
297    }
298}
299
300impl From<&CloseStreamRequest> for GroupWriteCommand {
301    fn from(request: &CloseStreamRequest) -> Self {
302        Self::CloseStream {
303            stream_id: request.stream_id.clone(),
304            stream_seq: request.stream_seq.clone(),
305            producer: request.producer.clone(),
306            now_ms: request.now_ms,
307        }
308    }
309}
310
311impl From<DeleteStreamRequest> for GroupWriteCommand {
312    fn from(request: DeleteStreamRequest) -> Self {
313        Self::DeleteStream {
314            stream_id: request.stream_id,
315        }
316    }
317}
318
319impl From<&DeleteStreamRequest> for GroupWriteCommand {
320    fn from(request: &DeleteStreamRequest) -> Self {
321        Self::DeleteStream {
322            stream_id: request.stream_id.clone(),
323        }
324    }
325}
326
327impl From<FlushColdRequest> for GroupWriteCommand {
328    fn from(request: FlushColdRequest) -> Self {
329        Self::FlushCold {
330            stream_id: request.stream_id,
331            chunk: request.chunk,
332        }
333    }
334}
335
336impl From<&FlushColdRequest> for GroupWriteCommand {
337    fn from(request: &FlushColdRequest) -> Self {
338        Self::FlushCold {
339            stream_id: request.stream_id.clone(),
340            chunk: request.chunk.clone(),
341        }
342    }
343}
344
345impl fmt::Display for GroupWriteCommand {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        match self {
348            Self::CreateStream { stream_id, .. } => {
349                write!(f, "create_stream:{stream_id}")
350            }
351            Self::CreateExternal {
352                stream_id,
353                initial_payload,
354                ..
355            } => {
356                write!(
357                    f,
358                    "create_external:{stream_id}:{} bytes",
359                    initial_payload.payload_len
360                )
361            }
362            Self::Append {
363                stream_id, payload, ..
364            } => {
365                write!(f, "append:{stream_id}:{} bytes", payload.len())
366            }
367            Self::AppendExternal {
368                stream_id, payload, ..
369            } => {
370                write!(
371                    f,
372                    "append_external:{stream_id}:{} bytes",
373                    payload.payload_len
374                )
375            }
376            Self::AppendBatch {
377                stream_id,
378                payloads,
379                ..
380            } => {
381                write!(f, "append_batch:{stream_id}:{} items", payloads.len())
382            }
383            Self::PublishSnapshot {
384                stream_id,
385                snapshot_offset,
386                payload,
387                ..
388            } => {
389                write!(
390                    f,
391                    "publish_snapshot:{stream_id}:{snapshot_offset}:{} bytes",
392                    payload.len()
393                )
394            }
395            Self::TouchStreamAccess {
396                stream_id,
397                renew_ttl,
398                ..
399            } => {
400                write!(f, "touch_stream_access:{stream_id}:renew_ttl={renew_ttl}")
401            }
402            Self::AddForkRef { stream_id, .. } => {
403                write!(f, "add_fork_ref:{stream_id}")
404            }
405            Self::ReleaseForkRef { stream_id } => {
406                write!(f, "release_fork_ref:{stream_id}")
407            }
408            Self::FlushCold { stream_id, chunk } => {
409                write!(
410                    f,
411                    "flush_cold:{stream_id}:{}..{}",
412                    chunk.start_offset, chunk.end_offset
413                )
414            }
415            Self::CloseStream { stream_id, .. } => {
416                write!(f, "close_stream:{stream_id}")
417            }
418            Self::DeleteStream { stream_id } => {
419                write!(f, "delete_stream:{stream_id}")
420            }
421            Self::Batch { commands } => {
422                write!(f, "batch:{} commands", commands.len())
423            }
424        }
425    }
426}