1use std::fmt;
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use ursula_shard::{BucketStreamId, ShardPlacement};
6use ursula_stream::{ColdChunkRef, ExternalPayloadRef, ProducerRequest, StreamSnapshot};
7
8use crate::request::{
9 AppendBatchRequest, AppendExternalRequest, AppendRequest, CloseStreamRequest,
10 CreateStreamExternalRequest, CreateStreamRequest, DeleteStreamRequest, FlushColdRequest,
11 PublishSnapshotRequest, StreamAppendCount,
12};
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct GroupSnapshot {
16 pub placement: ShardPlacement,
17 pub group_commit_index: u64,
18 pub stream_snapshot: StreamSnapshot,
19 pub stream_append_counts: Vec<StreamAppendCount>,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(tag = "command", rename_all = "snake_case")]
24pub enum GroupWriteCommand {
25 CreateStream {
26 stream_id: BucketStreamId,
27 content_type: String,
28 initial_payload: Bytes,
29 close_after: bool,
30 stream_seq: Option<String>,
31 producer: Option<ProducerRequest>,
32 stream_ttl_seconds: Option<u64>,
33 stream_expires_at_ms: Option<u64>,
34 forked_from: Option<BucketStreamId>,
35 fork_offset: Option<u64>,
36 now_ms: u64,
37 },
38 CreateExternal {
39 stream_id: BucketStreamId,
40 content_type: String,
41 initial_payload: ExternalPayloadRef,
42 close_after: bool,
43 stream_seq: Option<String>,
44 producer: Option<ProducerRequest>,
45 stream_ttl_seconds: Option<u64>,
46 stream_expires_at_ms: Option<u64>,
47 forked_from: Option<BucketStreamId>,
48 fork_offset: Option<u64>,
49 now_ms: u64,
50 },
51 Append {
52 stream_id: BucketStreamId,
53 content_type: String,
54 payload: Bytes,
55 close_after: bool,
56 stream_seq: Option<String>,
57 producer: Option<ProducerRequest>,
58 now_ms: u64,
59 },
60 AppendExternal {
61 stream_id: BucketStreamId,
62 content_type: String,
63 payload: ExternalPayloadRef,
64 close_after: bool,
65 stream_seq: Option<String>,
66 producer: Option<ProducerRequest>,
67 now_ms: u64,
68 },
69 AppendBatch {
70 stream_id: BucketStreamId,
71 content_type: String,
72 payloads: Vec<Bytes>,
73 producer: Option<ProducerRequest>,
74 now_ms: u64,
75 },
76 PublishSnapshot {
77 stream_id: BucketStreamId,
78 snapshot_offset: u64,
79 content_type: String,
80 payload: Bytes,
81 now_ms: u64,
82 },
83 TouchStreamAccess {
84 stream_id: BucketStreamId,
85 now_ms: u64,
86 renew_ttl: bool,
87 },
88 AddForkRef {
89 stream_id: BucketStreamId,
90 now_ms: u64,
91 },
92 ReleaseForkRef {
93 stream_id: BucketStreamId,
94 },
95 FlushCold {
96 stream_id: BucketStreamId,
97 chunk: ColdChunkRef,
98 },
99 CloseStream {
100 stream_id: BucketStreamId,
101 stream_seq: Option<String>,
102 producer: Option<ProducerRequest>,
103 now_ms: u64,
104 },
105 DeleteStream {
106 stream_id: BucketStreamId,
107 },
108 Batch {
109 commands: Vec<GroupWriteCommand>,
110 },
111}
112
113impl From<CreateStreamRequest> for GroupWriteCommand {
114 fn from(request: CreateStreamRequest) -> Self {
115 Self::CreateStream {
116 stream_id: request.stream_id,
117 content_type: request.content_type,
118 initial_payload: request.initial_payload,
119 close_after: request.close_after,
120 stream_seq: request.stream_seq,
121 producer: request.producer,
122 stream_ttl_seconds: request.stream_ttl_seconds,
123 stream_expires_at_ms: request.stream_expires_at_ms,
124 forked_from: request.forked_from,
125 fork_offset: request.fork_offset,
126 now_ms: request.now_ms,
127 }
128 }
129}
130
131impl From<&CreateStreamRequest> for GroupWriteCommand {
132 fn from(request: &CreateStreamRequest) -> Self {
133 Self::CreateStream {
134 stream_id: request.stream_id.clone(),
135 content_type: request.content_type.clone(),
136 initial_payload: request.initial_payload.clone(),
137 close_after: request.close_after,
138 stream_seq: request.stream_seq.clone(),
139 producer: request.producer.clone(),
140 stream_ttl_seconds: request.stream_ttl_seconds,
141 stream_expires_at_ms: request.stream_expires_at_ms,
142 forked_from: request.forked_from.clone(),
143 fork_offset: request.fork_offset,
144 now_ms: request.now_ms,
145 }
146 }
147}
148
149impl From<CreateStreamExternalRequest> for GroupWriteCommand {
150 fn from(request: CreateStreamExternalRequest) -> Self {
151 Self::CreateExternal {
152 stream_id: request.stream_id,
153 content_type: request.content_type,
154 initial_payload: request.initial_payload,
155 close_after: request.close_after,
156 stream_seq: request.stream_seq,
157 producer: request.producer,
158 stream_ttl_seconds: request.stream_ttl_seconds,
159 stream_expires_at_ms: request.stream_expires_at_ms,
160 forked_from: request.forked_from,
161 fork_offset: request.fork_offset,
162 now_ms: request.now_ms,
163 }
164 }
165}
166
167impl From<&CreateStreamExternalRequest> for GroupWriteCommand {
168 fn from(request: &CreateStreamExternalRequest) -> Self {
169 Self::CreateExternal {
170 stream_id: request.stream_id.clone(),
171 content_type: request.content_type.clone(),
172 initial_payload: request.initial_payload.clone(),
173 close_after: request.close_after,
174 stream_seq: request.stream_seq.clone(),
175 producer: request.producer.clone(),
176 stream_ttl_seconds: request.stream_ttl_seconds,
177 stream_expires_at_ms: request.stream_expires_at_ms,
178 forked_from: request.forked_from.clone(),
179 fork_offset: request.fork_offset,
180 now_ms: request.now_ms,
181 }
182 }
183}
184
185impl From<AppendRequest> for GroupWriteCommand {
186 fn from(request: AppendRequest) -> Self {
187 Self::Append {
188 stream_id: request.stream_id,
189 content_type: request.content_type,
190 payload: request.payload,
191 close_after: request.close_after,
192 stream_seq: request.stream_seq,
193 producer: request.producer,
194 now_ms: request.now_ms,
195 }
196 }
197}
198
199impl From<&AppendRequest> for GroupWriteCommand {
200 fn from(request: &AppendRequest) -> Self {
201 Self::Append {
202 stream_id: request.stream_id.clone(),
203 content_type: request.content_type.clone(),
204 payload: request.payload.clone(),
205 close_after: request.close_after,
206 stream_seq: request.stream_seq.clone(),
207 producer: request.producer.clone(),
208 now_ms: request.now_ms,
209 }
210 }
211}
212
213impl From<AppendExternalRequest> for GroupWriteCommand {
214 fn from(request: AppendExternalRequest) -> Self {
215 Self::AppendExternal {
216 stream_id: request.stream_id,
217 content_type: request.content_type,
218 payload: request.payload,
219 close_after: request.close_after,
220 stream_seq: request.stream_seq,
221 producer: request.producer,
222 now_ms: request.now_ms,
223 }
224 }
225}
226
227impl From<&AppendExternalRequest> for GroupWriteCommand {
228 fn from(request: &AppendExternalRequest) -> Self {
229 Self::AppendExternal {
230 stream_id: request.stream_id.clone(),
231 content_type: request.content_type.clone(),
232 payload: request.payload.clone(),
233 close_after: request.close_after,
234 stream_seq: request.stream_seq.clone(),
235 producer: request.producer.clone(),
236 now_ms: request.now_ms,
237 }
238 }
239}
240
241impl From<AppendBatchRequest> for GroupWriteCommand {
242 fn from(request: AppendBatchRequest) -> Self {
243 Self::AppendBatch {
244 stream_id: request.stream_id,
245 content_type: request.content_type,
246 payloads: request.payloads,
247 producer: request.producer,
248 now_ms: request.now_ms,
249 }
250 }
251}
252
253impl From<&AppendBatchRequest> for GroupWriteCommand {
254 fn from(request: &AppendBatchRequest) -> Self {
255 Self::AppendBatch {
256 stream_id: request.stream_id.clone(),
257 content_type: request.content_type.clone(),
258 payloads: request.payloads.clone(),
259 producer: request.producer.clone(),
260 now_ms: request.now_ms,
261 }
262 }
263}
264
265impl From<PublishSnapshotRequest> for GroupWriteCommand {
266 fn from(request: PublishSnapshotRequest) -> Self {
267 Self::PublishSnapshot {
268 stream_id: request.stream_id,
269 snapshot_offset: request.snapshot_offset,
270 content_type: request.content_type,
271 payload: request.payload,
272 now_ms: request.now_ms,
273 }
274 }
275}
276
277impl From<&PublishSnapshotRequest> for GroupWriteCommand {
278 fn from(request: &PublishSnapshotRequest) -> Self {
279 Self::PublishSnapshot {
280 stream_id: request.stream_id.clone(),
281 snapshot_offset: request.snapshot_offset,
282 content_type: request.content_type.clone(),
283 payload: request.payload.clone(),
284 now_ms: request.now_ms,
285 }
286 }
287}
288
289impl From<CloseStreamRequest> for GroupWriteCommand {
290 fn from(request: CloseStreamRequest) -> Self {
291 Self::CloseStream {
292 stream_id: request.stream_id,
293 stream_seq: request.stream_seq,
294 producer: request.producer,
295 now_ms: request.now_ms,
296 }
297 }
298}
299
300impl From<&CloseStreamRequest> for GroupWriteCommand {
301 fn from(request: &CloseStreamRequest) -> Self {
302 Self::CloseStream {
303 stream_id: request.stream_id.clone(),
304 stream_seq: request.stream_seq.clone(),
305 producer: request.producer.clone(),
306 now_ms: request.now_ms,
307 }
308 }
309}
310
311impl From<DeleteStreamRequest> for GroupWriteCommand {
312 fn from(request: DeleteStreamRequest) -> Self {
313 Self::DeleteStream {
314 stream_id: request.stream_id,
315 }
316 }
317}
318
319impl From<&DeleteStreamRequest> for GroupWriteCommand {
320 fn from(request: &DeleteStreamRequest) -> Self {
321 Self::DeleteStream {
322 stream_id: request.stream_id.clone(),
323 }
324 }
325}
326
327impl From<FlushColdRequest> for GroupWriteCommand {
328 fn from(request: FlushColdRequest) -> Self {
329 Self::FlushCold {
330 stream_id: request.stream_id,
331 chunk: request.chunk,
332 }
333 }
334}
335
336impl From<&FlushColdRequest> for GroupWriteCommand {
337 fn from(request: &FlushColdRequest) -> Self {
338 Self::FlushCold {
339 stream_id: request.stream_id.clone(),
340 chunk: request.chunk.clone(),
341 }
342 }
343}
344
345impl fmt::Display for GroupWriteCommand {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 match self {
348 Self::CreateStream { stream_id, .. } => {
349 write!(f, "create_stream:{stream_id}")
350 }
351 Self::CreateExternal {
352 stream_id,
353 initial_payload,
354 ..
355 } => {
356 write!(
357 f,
358 "create_external:{stream_id}:{} bytes",
359 initial_payload.payload_len
360 )
361 }
362 Self::Append {
363 stream_id, payload, ..
364 } => {
365 write!(f, "append:{stream_id}:{} bytes", payload.len())
366 }
367 Self::AppendExternal {
368 stream_id, payload, ..
369 } => {
370 write!(
371 f,
372 "append_external:{stream_id}:{} bytes",
373 payload.payload_len
374 )
375 }
376 Self::AppendBatch {
377 stream_id,
378 payloads,
379 ..
380 } => {
381 write!(f, "append_batch:{stream_id}:{} items", payloads.len())
382 }
383 Self::PublishSnapshot {
384 stream_id,
385 snapshot_offset,
386 payload,
387 ..
388 } => {
389 write!(
390 f,
391 "publish_snapshot:{stream_id}:{snapshot_offset}:{} bytes",
392 payload.len()
393 )
394 }
395 Self::TouchStreamAccess {
396 stream_id,
397 renew_ttl,
398 ..
399 } => {
400 write!(f, "touch_stream_access:{stream_id}:renew_ttl={renew_ttl}")
401 }
402 Self::AddForkRef { stream_id, .. } => {
403 write!(f, "add_fork_ref:{stream_id}")
404 }
405 Self::ReleaseForkRef { stream_id } => {
406 write!(f, "release_fork_ref:{stream_id}")
407 }
408 Self::FlushCold { stream_id, chunk } => {
409 write!(
410 f,
411 "flush_cold:{stream_id}:{}..{}",
412 chunk.start_offset, chunk.end_offset
413 )
414 }
415 Self::CloseStream { stream_id, .. } => {
416 write!(f, "close_stream:{stream_id}")
417 }
418 Self::DeleteStream { stream_id } => {
419 write!(f, "delete_stream:{stream_id}")
420 }
421 Self::Batch { commands } => {
422 write!(f, "batch:{} commands", commands.len())
423 }
424 }
425 }
426}