1use std::collections::{HashMap, HashSet};
2
3use serde::{Deserialize, Serialize};
4use ursula_proto::{ColdChunkRefV1, ExternalPayloadRefV1, ProducerRequestV1};
5use ursula_shard::BucketStreamId;
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8pub enum StreamCommand {
9 CreateBucket {
10 bucket_id: String,
11 },
12 DeleteBucket {
13 bucket_id: String,
14 },
15 CreateStream {
16 stream_id: BucketStreamId,
17 content_type: String,
18 initial_payload: Vec<u8>,
19 close_after: bool,
20 stream_seq: Option<String>,
21 producer: Option<ProducerRequest>,
22 stream_ttl_seconds: Option<u64>,
23 stream_expires_at_ms: Option<u64>,
24 forked_from: Option<BucketStreamId>,
25 fork_offset: Option<u64>,
26 now_ms: u64,
27 },
28 CreateExternal {
29 stream_id: BucketStreamId,
30 content_type: String,
31 initial_payload: ExternalPayloadRef,
32 close_after: bool,
33 stream_seq: Option<String>,
34 producer: Option<ProducerRequest>,
35 stream_ttl_seconds: Option<u64>,
36 stream_expires_at_ms: Option<u64>,
37 forked_from: Option<BucketStreamId>,
38 fork_offset: Option<u64>,
39 now_ms: u64,
40 },
41 Append {
42 stream_id: BucketStreamId,
43 content_type: Option<String>,
44 payload: Vec<u8>,
45 close_after: bool,
46 stream_seq: Option<String>,
47 producer: Option<ProducerRequest>,
48 now_ms: u64,
49 },
50 AppendExternal {
51 stream_id: BucketStreamId,
52 content_type: Option<String>,
53 payload: ExternalPayloadRef,
54 close_after: bool,
55 stream_seq: Option<String>,
56 producer: Option<ProducerRequest>,
57 now_ms: u64,
58 },
59 AppendBatch {
60 stream_id: BucketStreamId,
61 content_type: Option<String>,
62 payloads: Vec<Vec<u8>>,
63 producer: Option<ProducerRequest>,
64 now_ms: u64,
65 },
66 PublishSnapshot {
67 stream_id: BucketStreamId,
68 snapshot_offset: u64,
69 content_type: String,
70 payload: Vec<u8>,
71 now_ms: u64,
72 },
73 TouchStreamAccess {
74 stream_id: BucketStreamId,
75 now_ms: u64,
76 renew_ttl: bool,
77 },
78 AddForkRef {
79 stream_id: BucketStreamId,
80 now_ms: u64,
81 },
82 ReleaseForkRef {
83 stream_id: BucketStreamId,
84 },
85 FlushCold {
86 stream_id: BucketStreamId,
87 chunk: ColdChunkRef,
88 },
89 Close {
90 stream_id: BucketStreamId,
91 stream_seq: Option<String>,
92 producer: Option<ProducerRequest>,
93 now_ms: u64,
94 },
95 DeleteStream {
96 stream_id: BucketStreamId,
97 },
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum StreamResponse {
102 BucketCreated {
103 bucket_id: String,
104 },
105 BucketAlreadyExists {
106 bucket_id: String,
107 },
108 BucketDeleted {
109 bucket_id: String,
110 },
111 Created {
112 stream_id: BucketStreamId,
113 next_offset: u64,
114 closed: bool,
115 },
116 AlreadyExists {
117 next_offset: u64,
118 closed: bool,
119 content_type: String,
120 stream_ttl_seconds: Option<u64>,
121 stream_expires_at_ms: Option<u64>,
122 },
123 Appended {
124 offset: u64,
125 next_offset: u64,
126 closed: bool,
127 deduplicated: bool,
128 producer: Option<ProducerRequest>,
129 },
130 Closed {
131 next_offset: u64,
132 deduplicated: bool,
133 producer: Option<ProducerRequest>,
134 },
135 Deleted {
136 hard_deleted: bool,
137 parent_to_release: Option<BucketStreamId>,
138 },
139 ForkRefAdded {
140 fork_ref_count: u64,
141 },
142 ForkRefReleased {
143 hard_deleted: bool,
144 fork_ref_count: u64,
145 parent_to_release: Option<BucketStreamId>,
146 },
147 ColdFlushed {
148 hot_start_offset: u64,
149 },
150 SnapshotPublished {
151 snapshot_offset: u64,
152 },
153 Accessed {
154 changed: bool,
155 expired: bool,
156 },
157 Error {
158 code: StreamErrorCode,
159 message: String,
160 next_offset: Option<u64>,
161 },
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165pub enum StreamErrorCode {
166 InvalidBucketId,
167 InvalidStreamId,
168 BucketNotFound,
169 BucketNotEmpty,
170 StreamNotFound,
171 StreamGone,
172 StreamAlreadyExistsConflict,
173 MissingContentType,
174 ContentTypeMismatch,
175 EmptyAppend,
176 StreamClosed,
177 StreamSeqConflict,
178 InvalidProducer,
179 ProducerEpochStale,
180 ProducerSeqConflict,
181 InvalidRetention,
182 InvalidFork,
183 OffsetOutOfRange,
184 InvalidColdFlush,
185 InvalidSnapshot,
186 SnapshotNotFound,
187 SnapshotConflict,
188}
189
190impl StreamResponse {
191 fn error(code: StreamErrorCode, message: impl Into<String>) -> Self {
192 Self::Error {
193 code,
194 message: message.into(),
195 next_offset: None,
196 }
197 }
198
199 fn error_with_next_offset(
200 code: StreamErrorCode,
201 message: impl Into<String>,
202 next_offset: u64,
203 ) -> Self {
204 Self::Error {
205 code,
206 message: message.into(),
207 next_offset: Some(next_offset),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
213pub enum StreamStatus {
214 Open,
215 Closed,
216 SoftDeleted,
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
220pub struct StreamMetadata {
221 pub stream_id: BucketStreamId,
222 pub content_type: String,
223 pub status: StreamStatus,
224 pub tail_offset: u64,
225 pub last_stream_seq: Option<String>,
226 pub stream_ttl_seconds: Option<u64>,
227 pub stream_expires_at_ms: Option<u64>,
228 pub created_at_ms: u64,
229 pub last_ttl_touch_at_ms: u64,
230 pub forked_from: Option<BucketStreamId>,
231 pub fork_offset: Option<u64>,
232 pub fork_ref_count: u64,
233}
234
235pub type ProducerRequest = ProducerRequestV1;
236
237#[derive(Debug)]
238pub struct AppendStreamInput<'a> {
239 pub stream_id: BucketStreamId,
240 pub content_type: Option<&'a str>,
241 pub payload: &'a [u8],
242 pub close_after: bool,
243 pub stream_seq: Option<String>,
244 pub producer: Option<ProducerRequest>,
245 pub now_ms: u64,
246}
247
248#[derive(Debug)]
249struct AppendExternalInput<'a> {
250 stream_id: BucketStreamId,
251 content_type: Option<&'a str>,
252 payload: ExternalPayloadRef,
253 close_after: bool,
254 stream_seq: Option<String>,
255 producer: Option<ProducerRequest>,
256 now_ms: u64,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
260pub struct ProducerSnapshot {
261 pub producer_id: String,
262 pub producer_epoch: u64,
263 pub producer_seq: u64,
264 pub last_start_offset: u64,
265 pub last_next_offset: u64,
266 pub last_closed: bool,
267 pub last_items: Vec<ProducerAppendRecord>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271pub struct ProducerAppendRecord {
272 pub start_offset: u64,
273 pub next_offset: u64,
274 pub closed: bool,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq)]
278struct ProducerState {
279 producer_epoch: u64,
280 producer_seq: u64,
281 last_start_offset: u64,
282 last_next_offset: u64,
283 last_closed: bool,
284 last_items: Vec<ProducerAppendRecord>,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct StreamBatchAppend {
289 pub items: Vec<StreamBatchAppendItem>,
290 pub deduplicated: bool,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct StreamBatchAppendItem {
295 pub offset: u64,
296 pub next_offset: u64,
297 pub closed: bool,
298 pub deduplicated: bool,
299}
300
301#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct StreamRead {
303 pub offset: u64,
304 pub next_offset: u64,
305 pub content_type: String,
306 pub payload: Vec<u8>,
307 pub up_to_date: bool,
308 pub closed: bool,
309}
310
311pub type ColdChunkRef = ColdChunkRefV1;
312
313#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
314pub struct ObjectPayloadRef {
315 pub start_offset: u64,
316 pub end_offset: u64,
317 pub s3_path: String,
318 pub object_size: u64,
319}
320
321impl From<&ColdChunkRef> for ObjectPayloadRef {
322 fn from(chunk: &ColdChunkRef) -> Self {
323 Self {
324 start_offset: chunk.start_offset,
325 end_offset: chunk.end_offset,
326 s3_path: chunk.s3_path.clone(),
327 object_size: chunk.object_size,
328 }
329 }
330}
331
332pub type ExternalPayloadRef = ExternalPayloadRefV1;
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335pub struct HotPayloadSegment {
336 pub start_offset: u64,
337 pub end_offset: u64,
338 pub payload_start: usize,
339 pub payload_end: usize,
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct ColdFlushCandidate {
344 pub stream_id: BucketStreamId,
345 pub start_offset: u64,
346 pub end_offset: u64,
347 pub payload: Vec<u8>,
348}
349
350#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct StreamReadColdSegment {
352 pub chunk: ColdChunkRef,
353 pub read_start_offset: u64,
354 pub len: usize,
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct StreamReadObjectSegment {
359 pub object: ObjectPayloadRef,
360 pub read_start_offset: u64,
361 pub len: usize,
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub enum StreamReadSegment {
366 Object(StreamReadObjectSegment),
367 Hot(Vec<u8>),
368}
369
370#[derive(Debug, Clone, PartialEq, Eq)]
371pub struct StreamReadPlan {
372 pub offset: u64,
373 pub next_offset: u64,
374 pub content_type: String,
375 pub segments: Vec<StreamReadSegment>,
376 pub up_to_date: bool,
377 pub closed: bool,
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
381pub struct StreamMessageRecord {
382 pub start_offset: u64,
383 pub end_offset: u64,
384}
385
386#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
387pub struct StreamVisibleSnapshot {
388 pub offset: u64,
389 pub content_type: String,
390 pub payload: Vec<u8>,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct StreamBootstrapPlan {
395 pub snapshot: Option<StreamVisibleSnapshot>,
396 pub updates: Vec<StreamMessageRecord>,
397 pub next_offset: u64,
398 pub content_type: String,
399 pub up_to_date: bool,
400 pub closed: bool,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404pub struct StreamSnapshot {
405 pub buckets: Vec<String>,
406 pub streams: Vec<StreamSnapshotEntry>,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
410pub struct StreamSnapshotEntry {
411 pub metadata: StreamMetadata,
412 pub hot_start_offset: u64,
413 pub payload: Vec<u8>,
414 pub hot_segments: Vec<HotPayloadSegment>,
415 pub cold_chunks: Vec<ColdChunkRef>,
416 pub external_segments: Vec<ObjectPayloadRef>,
417 pub message_records: Vec<StreamMessageRecord>,
418 pub visible_snapshot: Option<StreamVisibleSnapshot>,
419 pub producer_states: Vec<ProducerSnapshot>,
420}
421
422#[derive(Debug, Clone, PartialEq, Eq)]
423pub enum StreamSnapshotError {
424 DuplicateBucket(String),
425 DuplicateStream(BucketStreamId),
426 DuplicateProducer {
427 stream_id: BucketStreamId,
428 producer_id: String,
429 },
430 MissingBucket(BucketStreamId),
431 PayloadLengthMismatch {
432 stream_id: BucketStreamId,
433 tail_offset: u64,
434 payload_len: usize,
435 },
436 MessageBoundaryMismatch {
437 stream_id: BucketStreamId,
438 },
439 SnapshotOffsetOutOfRange {
440 stream_id: BucketStreamId,
441 snapshot_offset: u64,
442 tail_offset: u64,
443 },
444}
445
446impl std::fmt::Display for StreamSnapshotError {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 match self {
449 Self::DuplicateBucket(bucket_id) => {
450 write!(f, "snapshot contains duplicate bucket '{bucket_id}'")
451 }
452 Self::DuplicateStream(stream_id) => {
453 write!(f, "snapshot contains duplicate stream '{stream_id}'")
454 }
455 Self::DuplicateProducer {
456 stream_id,
457 producer_id,
458 } => write!(
459 f,
460 "snapshot stream '{stream_id}' contains duplicate producer '{producer_id}'"
461 ),
462 Self::MissingBucket(stream_id) => {
463 write!(
464 f,
465 "snapshot stream '{stream_id}' references a missing bucket"
466 )
467 }
468 Self::PayloadLengthMismatch {
469 stream_id,
470 tail_offset,
471 payload_len,
472 } => write!(
473 f,
474 "snapshot stream '{stream_id}' tail offset {tail_offset} does not match payload length {payload_len}"
475 ),
476 Self::MessageBoundaryMismatch { stream_id } => write!(
477 f,
478 "snapshot stream '{stream_id}' has inconsistent message boundaries"
479 ),
480 Self::SnapshotOffsetOutOfRange {
481 stream_id,
482 snapshot_offset,
483 tail_offset,
484 } => write!(
485 f,
486 "snapshot stream '{stream_id}' visible snapshot offset {snapshot_offset} is beyond tail offset {tail_offset}"
487 ),
488 }
489 }
490}
491
492impl std::error::Error for StreamSnapshotError {}
493
494#[derive(Debug, Clone, Default)]
495pub struct StreamStateMachine {
496 buckets: HashSet<String>,
497 streams: HashMap<BucketStreamId, StreamMetadata>,
498 payloads: HashMap<BucketStreamId, Vec<u8>>,
499 hot_segments: HashMap<BucketStreamId, Vec<HotPayloadSegment>>,
500 hot_start_offsets: HashMap<BucketStreamId, u64>,
501 cold_chunks: HashMap<BucketStreamId, Vec<ColdChunkRef>>,
502 external_segments: HashMap<BucketStreamId, Vec<ObjectPayloadRef>>,
503 message_records: HashMap<BucketStreamId, Vec<StreamMessageRecord>>,
504 visible_snapshots: HashMap<BucketStreamId, StreamVisibleSnapshot>,
505 producers: HashMap<BucketStreamId, HashMap<String, ProducerState>>,
506}
507
508impl StreamStateMachine {
509 pub fn new() -> Self {
510 Self::default()
511 }
512
513 pub fn apply(&mut self, command: StreamCommand) -> StreamResponse {
514 match command {
515 StreamCommand::CreateBucket { bucket_id } => self.create_bucket(bucket_id),
516 StreamCommand::DeleteBucket { bucket_id } => self.delete_bucket(&bucket_id),
517 StreamCommand::CreateStream {
518 stream_id,
519 content_type,
520 initial_payload,
521 close_after,
522 stream_seq,
523 producer,
524 stream_ttl_seconds,
525 stream_expires_at_ms,
526 forked_from,
527 fork_offset,
528 now_ms,
529 } => self.create_stream(CreateStreamInput {
530 stream_id,
531 content_type,
532 initial_payload,
533 close_after,
534 stream_seq,
535 producer,
536 stream_ttl_seconds,
537 stream_expires_at_ms,
538 forked_from,
539 fork_offset,
540 now_ms,
541 }),
542 StreamCommand::CreateExternal {
543 stream_id,
544 content_type,
545 initial_payload,
546 close_after,
547 stream_seq,
548 producer,
549 stream_ttl_seconds,
550 stream_expires_at_ms,
551 forked_from,
552 fork_offset,
553 now_ms,
554 } => self.create_external_stream(CreateExternalStreamInput {
555 stream_id,
556 content_type,
557 initial_payload,
558 close_after,
559 stream_seq,
560 producer,
561 stream_ttl_seconds,
562 stream_expires_at_ms,
563 forked_from,
564 fork_offset,
565 now_ms,
566 }),
567 StreamCommand::Append {
568 stream_id,
569 content_type,
570 payload,
571 close_after,
572 stream_seq,
573 producer,
574 now_ms,
575 } => self.append_borrowed(AppendStreamInput {
576 stream_id,
577 content_type: content_type.as_deref(),
578 payload: &payload,
579 close_after,
580 stream_seq,
581 producer,
582 now_ms,
583 }),
584 StreamCommand::AppendExternal {
585 stream_id,
586 content_type,
587 payload,
588 close_after,
589 stream_seq,
590 producer,
591 now_ms,
592 } => self.append_external(AppendExternalInput {
593 stream_id,
594 content_type: content_type.as_deref(),
595 payload,
596 close_after,
597 stream_seq,
598 producer,
599 now_ms,
600 }),
601 StreamCommand::AppendBatch {
602 stream_id,
603 content_type,
604 payloads,
605 producer,
606 now_ms,
607 } => match self.append_batch_borrowed(
608 stream_id,
609 content_type.as_deref(),
610 &payloads.iter().map(Vec::as_slice).collect::<Vec<_>>(),
611 producer,
612 now_ms,
613 ) {
614 Ok(batch) => batch
615 .items
616 .last()
617 .map(|item| StreamResponse::Appended {
618 offset: item.offset,
619 next_offset: item.next_offset,
620 closed: item.closed,
621 deduplicated: item.deduplicated,
622 producer: None,
623 })
624 .unwrap_or_else(|| {
625 StreamResponse::error(
626 StreamErrorCode::EmptyAppend,
627 "append batch must contain at least one payload",
628 )
629 }),
630 Err(response) => response,
631 },
632 StreamCommand::PublishSnapshot {
633 stream_id,
634 snapshot_offset,
635 content_type,
636 payload,
637 now_ms,
638 } => self.publish_snapshot(stream_id, snapshot_offset, content_type, payload, now_ms),
639 StreamCommand::TouchStreamAccess {
640 stream_id,
641 now_ms,
642 renew_ttl,
643 } => self.touch_stream_access(&stream_id, now_ms, renew_ttl),
644 StreamCommand::AddForkRef { stream_id, now_ms } => {
645 self.add_fork_ref(&stream_id, now_ms)
646 }
647 StreamCommand::ReleaseForkRef { stream_id } => self.release_fork_ref(&stream_id),
648 StreamCommand::FlushCold { stream_id, chunk } => self.flush_cold(stream_id, chunk),
649 StreamCommand::Close {
650 stream_id,
651 stream_seq,
652 producer,
653 now_ms,
654 } => self.close(stream_id, stream_seq, producer, now_ms),
655 StreamCommand::DeleteStream { stream_id } => self.delete_stream(&stream_id),
656 }
657 }
658
659 pub fn head(&self, stream_id: &BucketStreamId) -> Option<&StreamMetadata> {
660 self.streams.get(stream_id)
661 }
662
663 pub fn head_at(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> Option<&StreamMetadata> {
664 self.expire_stream_if_due(stream_id, now_ms);
665 self.streams.get(stream_id)
666 }
667
668 pub fn access_requires_write(
669 &self,
670 stream_id: &BucketStreamId,
671 now_ms: u64,
672 renew_ttl: bool,
673 ) -> Result<bool, StreamResponse> {
674 self.validate_stream_scope(stream_id)?;
675 let Some(stream) = self.streams.get(stream_id) else {
676 return Err(StreamResponse::error(
677 StreamErrorCode::StreamNotFound,
678 format!("stream '{stream_id}' does not exist"),
679 ));
680 };
681 if is_soft_deleted(stream) {
682 return Err(StreamResponse::error(
683 StreamErrorCode::StreamGone,
684 format!("stream '{stream_id}' is gone"),
685 ));
686 }
687 if stream_is_expired(stream, now_ms) {
688 return Ok(true);
689 }
690 Ok(renew_ttl
691 && stream.stream_ttl_seconds.is_some()
692 && stream.last_ttl_touch_at_ms != now_ms)
693 }
694
695 pub fn hot_start_offset(&self, stream_id: &BucketStreamId) -> u64 {
696 self.hot_start_offsets.get(stream_id).copied().unwrap_or(0)
697 }
698
699 pub fn cold_chunks(&self, stream_id: &BucketStreamId) -> &[ColdChunkRef] {
700 self.cold_chunks
701 .get(stream_id)
702 .map(Vec::as_slice)
703 .unwrap_or(&[])
704 }
705
706 pub fn external_segments(&self, stream_id: &BucketStreamId) -> &[ObjectPayloadRef] {
707 self.external_segments
708 .get(stream_id)
709 .map(Vec::as_slice)
710 .unwrap_or(&[])
711 }
712
713 pub fn hot_segments(&self, stream_id: &BucketStreamId) -> &[HotPayloadSegment] {
714 self.hot_segments
715 .get(stream_id)
716 .map(Vec::as_slice)
717 .unwrap_or(&[])
718 }
719
720 pub fn hot_payload_len(&self, stream_id: &BucketStreamId) -> Result<u64, StreamResponse> {
721 let Some(stream) = self.streams.get(stream_id) else {
722 return Err(StreamResponse::error(
723 StreamErrorCode::StreamNotFound,
724 format!("stream '{stream_id}' does not exist"),
725 ));
726 };
727 if is_soft_deleted(stream) {
728 return Err(StreamResponse::error(
729 StreamErrorCode::StreamGone,
730 format!("stream '{stream_id}' is gone"),
731 ));
732 }
733 let payload = self
734 .payloads
735 .get(stream_id)
736 .expect("payload vector exists for stream metadata");
737 Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
738 }
739
740 pub fn total_hot_payload_bytes(&self) -> u64 {
741 self.payloads
742 .values()
743 .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
744 .sum()
745 }
746
747 pub fn plan_cold_flush(
748 &self,
749 stream_id: &BucketStreamId,
750 min_hot_bytes: usize,
751 max_flush_bytes: usize,
752 ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
753 if max_flush_bytes == 0 {
754 return Ok(None);
755 }
756 let Some(stream) = self.streams.get(stream_id) else {
757 return Err(StreamResponse::error(
758 StreamErrorCode::StreamNotFound,
759 format!("stream '{stream_id}' does not exist"),
760 ));
761 };
762 if is_soft_deleted(stream) {
763 return Err(StreamResponse::error(
764 StreamErrorCode::StreamGone,
765 format!("stream '{stream_id}' is gone"),
766 ));
767 }
768 let Some(first_segment) = self.hot_segments(stream_id).first() else {
769 return Ok(None);
770 };
771 let mut payload_end = first_segment.payload_start;
772 let mut end_offset = first_segment.start_offset;
773 let mut flush_len = 0usize;
774 for segment in self.hot_segments(stream_id) {
775 if segment.start_offset != end_offset || segment.payload_start != payload_end {
776 break;
777 }
778 let remaining = max_flush_bytes.saturating_sub(flush_len);
779 if remaining == 0 {
780 break;
781 }
782 let segment_len = segment.payload_end - segment.payload_start;
783 let take = segment_len.min(remaining);
784 flush_len += take;
785 payload_end += take;
786 end_offset = end_offset.saturating_add(u64::try_from(take).expect("take fits u64"));
787 if take < segment_len {
788 break;
789 }
790 }
791 if flush_len < min_hot_bytes {
792 return Ok(None);
793 }
794 let payload = self
795 .payloads
796 .get(stream_id)
797 .expect("payload vector exists for stream metadata");
798 let start_offset = first_segment.start_offset;
799 Ok(Some(ColdFlushCandidate {
800 stream_id: stream_id.clone(),
801 start_offset,
802 end_offset,
803 payload: payload[first_segment.payload_start..payload_end].to_vec(),
804 }))
805 }
806
807 pub fn plan_next_cold_flush(
808 &self,
809 min_hot_bytes: usize,
810 max_flush_bytes: usize,
811 ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
812 if max_flush_bytes == 0 {
813 return Ok(None);
814 }
815 let mut stream_ids = self.streams.keys().cloned().collect::<Vec<_>>();
816 stream_ids.sort_by(compare_stream_ids);
817 for stream_id in stream_ids {
818 match self.plan_cold_flush(&stream_id, min_hot_bytes, max_flush_bytes) {
819 Ok(Some(candidate)) => return Ok(Some(candidate)),
820 Ok(None) => {}
821 Err(StreamResponse::Error {
822 code: StreamErrorCode::StreamGone | StreamErrorCode::StreamNotFound,
823 ..
824 }) => {}
825 Err(err) => return Err(err),
826 }
827 }
828 Ok(None)
829 }
830
831 pub fn plan_next_cold_flush_batch(
832 &self,
833 min_hot_bytes: usize,
834 max_flush_bytes: usize,
835 max_candidates: usize,
836 ) -> Result<Vec<ColdFlushCandidate>, StreamResponse> {
837 if max_candidates == 0 || max_flush_bytes == 0 {
838 return Ok(Vec::new());
839 }
840 let mut preview = self.clone();
841 let mut candidates = Vec::with_capacity(max_candidates);
842 while candidates.len() < max_candidates {
843 let Some(candidate) = preview.plan_next_cold_flush(min_hot_bytes, max_flush_bytes)?
844 else {
845 break;
846 };
847 let chunk = ColdChunkRef {
848 start_offset: candidate.start_offset,
849 end_offset: candidate.end_offset,
850 s3_path: "planned-cold-flush-batch".to_owned(),
851 object_size: u64::try_from(candidate.payload.len()).expect("payload len fits u64"),
852 };
853 match preview.flush_cold(candidate.stream_id.clone(), chunk) {
854 StreamResponse::ColdFlushed { .. } => candidates.push(candidate),
855 StreamResponse::Error { .. } => break,
856 other => {
857 return Err(StreamResponse::error(
858 StreamErrorCode::InvalidColdFlush,
859 format!("unexpected cold flush planning response: {other:?}"),
860 ));
861 }
862 }
863 }
864 Ok(candidates)
865 }
866
867 pub fn bucket_exists(&self, bucket_id: &str) -> bool {
868 self.buckets.contains(bucket_id)
869 }
870
871 pub fn snapshot(&self) -> StreamSnapshot {
872 let mut buckets = self.buckets.iter().cloned().collect::<Vec<_>>();
873 buckets.sort();
874
875 let mut streams = self
876 .streams
877 .values()
878 .cloned()
879 .map(|metadata| {
880 let stream_id = metadata.stream_id.clone();
881 let payload = self
882 .payloads
883 .get(&stream_id)
884 .expect("payload vector exists for stream metadata")
885 .clone();
886 let producer_states = self.producer_snapshot(&stream_id);
887 StreamSnapshotEntry {
888 metadata,
889 hot_start_offset: self.hot_start_offset(&stream_id),
890 payload,
891 hot_segments: self
892 .hot_segments
893 .get(&stream_id)
894 .cloned()
895 .unwrap_or_default(),
896 cold_chunks: self
897 .cold_chunks
898 .get(&stream_id)
899 .cloned()
900 .unwrap_or_default(),
901 external_segments: self
902 .external_segments
903 .get(&stream_id)
904 .cloned()
905 .unwrap_or_default(),
906 message_records: self
907 .message_records
908 .get(&stream_id)
909 .cloned()
910 .unwrap_or_default(),
911 visible_snapshot: self.visible_snapshots.get(&stream_id).cloned(),
912 producer_states,
913 }
914 })
915 .collect::<Vec<_>>();
916 streams.sort_by(|left, right| {
917 compare_stream_ids(&left.metadata.stream_id, &right.metadata.stream_id)
918 });
919
920 StreamSnapshot { buckets, streams }
921 }
922
923 pub fn restore(snapshot: StreamSnapshot) -> Result<Self, StreamSnapshotError> {
924 let mut machine = Self::default();
925 for bucket_id in snapshot.buckets {
926 if !machine.buckets.insert(bucket_id.clone()) {
927 return Err(StreamSnapshotError::DuplicateBucket(bucket_id));
928 }
929 }
930
931 for entry in snapshot.streams {
932 let stream_id = entry.metadata.stream_id.clone();
933 if !machine.buckets.contains(&stream_id.bucket_id) {
934 return Err(StreamSnapshotError::MissingBucket(stream_id));
935 }
936 if let Some(snapshot) = entry.visible_snapshot.as_ref()
937 && snapshot.offset > entry.metadata.tail_offset
938 {
939 return Err(StreamSnapshotError::SnapshotOffsetOutOfRange {
940 stream_id,
941 snapshot_offset: snapshot.offset,
942 tail_offset: entry.metadata.tail_offset,
943 });
944 }
945 let retained_offset = entry
946 .visible_snapshot
947 .as_ref()
948 .map(|snapshot| snapshot.offset)
949 .unwrap_or(0);
950 let hot_segments = if entry.hot_segments.is_empty() && !entry.payload.is_empty() {
951 vec![HotPayloadSegment {
952 start_offset: entry.hot_start_offset,
953 end_offset: entry.metadata.tail_offset,
954 payload_start: 0,
955 payload_end: entry.payload.len(),
956 }]
957 } else {
958 entry.hot_segments
959 };
960 if !hot_segments_match_payload(&hot_segments, entry.payload.len())
961 || !payload_sources_cover_retained_suffix(
962 &entry.cold_chunks,
963 &entry.external_segments,
964 &hot_segments,
965 retained_offset,
966 entry.metadata.tail_offset,
967 )
968 {
969 return Err(StreamSnapshotError::PayloadLengthMismatch {
970 stream_id,
971 tail_offset: entry.metadata.tail_offset,
972 payload_len: entry.payload.len(),
973 });
974 }
975 if !message_records_cover_retained_suffix(
976 &entry.message_records,
977 retained_offset,
978 entry.metadata.tail_offset,
979 ) {
980 return Err(StreamSnapshotError::MessageBoundaryMismatch { stream_id });
981 }
982 if machine
983 .streams
984 .insert(entry.metadata.stream_id.clone(), entry.metadata)
985 .is_some()
986 {
987 return Err(StreamSnapshotError::DuplicateStream(stream_id));
988 }
989 let producer_states = restore_producer_states(&stream_id, entry.producer_states)?;
990 if !hot_segments.is_empty() {
991 machine.hot_segments.insert(stream_id.clone(), hot_segments);
992 }
993 if !entry.cold_chunks.is_empty() {
994 machine
995 .cold_chunks
996 .insert(stream_id.clone(), entry.cold_chunks);
997 }
998 if !entry.external_segments.is_empty() {
999 machine
1000 .external_segments
1001 .insert(stream_id.clone(), entry.external_segments);
1002 }
1003 if !entry.message_records.is_empty() {
1004 machine
1005 .message_records
1006 .insert(stream_id.clone(), entry.message_records);
1007 }
1008 if let Some(snapshot) = entry.visible_snapshot {
1009 machine
1010 .visible_snapshots
1011 .insert(stream_id.clone(), snapshot);
1012 }
1013 machine.payloads.insert(stream_id.clone(), entry.payload);
1014 machine.producers.insert(stream_id.clone(), producer_states);
1015 machine.refresh_hot_start_offset(&stream_id);
1016 }
1017
1018 Ok(machine)
1019 }
1020
1021 pub fn read(
1022 &self,
1023 stream_id: &BucketStreamId,
1024 offset: u64,
1025 max_len: usize,
1026 ) -> Result<StreamRead, StreamResponse> {
1027 let plan = self.read_plan(stream_id, offset, max_len)?;
1028 if plan
1029 .segments
1030 .iter()
1031 .any(|segment| matches!(segment, StreamReadSegment::Object(_)))
1032 {
1033 return Err(StreamResponse::error_with_next_offset(
1034 StreamErrorCode::InvalidColdFlush,
1035 format!("stream '{stream_id}' read requires object payload store"),
1036 plan.next_offset,
1037 ));
1038 }
1039 let payload = plan
1040 .segments
1041 .iter()
1042 .flat_map(|segment| match segment {
1043 StreamReadSegment::Hot(payload) => payload.as_slice(),
1044 StreamReadSegment::Object(_) => unreachable!("object segments checked above"),
1045 })
1046 .copied()
1047 .collect();
1048 Ok(StreamRead {
1049 offset: plan.offset,
1050 next_offset: plan.next_offset,
1051 content_type: plan.content_type,
1052 payload,
1053 up_to_date: plan.up_to_date,
1054 closed: plan.closed,
1055 })
1056 }
1057
1058 pub fn read_plan(
1059 &self,
1060 stream_id: &BucketStreamId,
1061 offset: u64,
1062 max_len: usize,
1063 ) -> Result<StreamReadPlan, StreamResponse> {
1064 self.read_plan_at(stream_id, offset, max_len, 0)
1065 }
1066
1067 pub fn read_plan_at(
1068 &self,
1069 stream_id: &BucketStreamId,
1070 offset: u64,
1071 max_len: usize,
1072 now_ms: u64,
1073 ) -> Result<StreamReadPlan, StreamResponse> {
1074 let Some(stream) = self.streams.get(stream_id) else {
1075 return Err(StreamResponse::error(
1076 StreamErrorCode::StreamNotFound,
1077 format!("stream '{stream_id}' does not exist"),
1078 ));
1079 };
1080 if is_soft_deleted(stream) {
1081 return Err(StreamResponse::error(
1082 StreamErrorCode::StreamGone,
1083 format!("stream '{stream_id}' is gone"),
1084 ));
1085 }
1086 if stream_is_expired(stream, now_ms) {
1087 return Err(StreamResponse::error(
1088 StreamErrorCode::StreamNotFound,
1089 format!("stream '{stream_id}' does not exist"),
1090 ));
1091 }
1092 if offset > stream.tail_offset {
1093 return Err(StreamResponse::error_with_next_offset(
1094 StreamErrorCode::OffsetOutOfRange,
1095 format!(
1096 "offset {offset} is beyond stream '{}' tail {}",
1097 stream_id, stream.tail_offset
1098 ),
1099 stream.tail_offset,
1100 ));
1101 }
1102 let retained_offset = self.earliest_retained_offset(stream_id);
1103 if offset < retained_offset {
1104 return Err(StreamResponse::error_with_next_offset(
1105 StreamErrorCode::StreamGone,
1106 format!(
1107 "offset {offset} is older than stream '{}' retained offset {retained_offset}",
1108 stream_id
1109 ),
1110 retained_offset,
1111 ));
1112 }
1113
1114 let max_len_u64 = u64::try_from(max_len).unwrap_or(u64::MAX);
1115 let next_offset = stream.tail_offset.min(offset.saturating_add(max_len_u64));
1116 let payload = self
1117 .payloads
1118 .get(stream_id)
1119 .expect("payload vector exists for stream metadata");
1120 let mut segments = Vec::<(u64, StreamReadSegment)>::new();
1121 for chunk in self.cold_chunks(stream_id) {
1122 let start = offset.max(chunk.start_offset);
1123 let end = next_offset.min(chunk.end_offset);
1124 if start < end {
1125 segments.push((
1126 start,
1127 StreamReadSegment::Object(StreamReadObjectSegment {
1128 object: ObjectPayloadRef::from(chunk),
1129 read_start_offset: start,
1130 len: usize::try_from(end - start).expect("object read len fits usize"),
1131 }),
1132 ));
1133 }
1134 }
1135 for object in self.external_segments(stream_id) {
1136 let start = offset.max(object.start_offset);
1137 let end = next_offset.min(object.end_offset);
1138 if start < end {
1139 segments.push((
1140 start,
1141 StreamReadSegment::Object(StreamReadObjectSegment {
1142 object: object.clone(),
1143 read_start_offset: start,
1144 len: usize::try_from(end - start).expect("object read len fits usize"),
1145 }),
1146 ));
1147 }
1148 }
1149 for segment in self.hot_segments(stream_id) {
1150 let start = offset.max(segment.start_offset);
1151 let end = next_offset.min(segment.end_offset);
1152 if start < end {
1153 let payload_start = segment.payload_start
1154 + usize::try_from(start - segment.start_offset)
1155 .expect("hot segment start fits usize");
1156 let payload_end = segment.payload_start
1157 + usize::try_from(end - segment.start_offset)
1158 .expect("hot segment end fits usize");
1159 segments.push((
1160 start,
1161 StreamReadSegment::Hot(payload[payload_start..payload_end].to_vec()),
1162 ));
1163 }
1164 }
1165 segments.sort_by_key(|(start, _)| *start);
1166 if !segments_cover_range(&segments, offset, next_offset) {
1167 return Err(StreamResponse::error_with_next_offset(
1168 StreamErrorCode::InvalidColdFlush,
1169 format!("stream '{stream_id}' has missing payload segment metadata"),
1170 next_offset,
1171 ));
1172 }
1173 Ok(StreamReadPlan {
1174 offset,
1175 next_offset,
1176 content_type: stream.content_type.clone(),
1177 segments: segments.into_iter().map(|(_, segment)| segment).collect(),
1178 up_to_date: next_offset == stream.tail_offset,
1179 closed: stream.status == StreamStatus::Closed,
1180 })
1181 }
1182
1183 pub fn latest_snapshot(
1184 &self,
1185 stream_id: &BucketStreamId,
1186 ) -> Result<Option<StreamVisibleSnapshot>, StreamResponse> {
1187 let Some(stream) = self.streams.get(stream_id) else {
1188 return Err(StreamResponse::error(
1189 StreamErrorCode::StreamNotFound,
1190 format!("stream '{stream_id}' does not exist"),
1191 ));
1192 };
1193 if is_soft_deleted(stream) {
1194 return Err(StreamResponse::error(
1195 StreamErrorCode::StreamGone,
1196 format!("stream '{stream_id}' is gone"),
1197 ));
1198 }
1199 Ok(self.visible_snapshots.get(stream_id).cloned())
1200 }
1201
1202 pub fn read_snapshot(
1203 &self,
1204 stream_id: &BucketStreamId,
1205 snapshot_offset: u64,
1206 ) -> Result<StreamVisibleSnapshot, StreamResponse> {
1207 let snapshot = self.latest_snapshot(stream_id)?;
1208 match snapshot {
1209 Some(snapshot) if snapshot.offset == snapshot_offset => Ok(snapshot),
1210 _ => Err(StreamResponse::error(
1211 StreamErrorCode::SnapshotNotFound,
1212 format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
1213 )),
1214 }
1215 }
1216
1217 pub fn delete_snapshot(
1218 &self,
1219 stream_id: &BucketStreamId,
1220 snapshot_offset: u64,
1221 ) -> StreamResponse {
1222 match self.latest_snapshot(stream_id) {
1223 Ok(Some(snapshot)) if snapshot.offset == snapshot_offset => StreamResponse::error(
1224 StreamErrorCode::SnapshotConflict,
1225 format!(
1226 "snapshot {snapshot_offset} for stream '{stream_id}' is the latest visible snapshot"
1227 ),
1228 ),
1229 Ok(_) => StreamResponse::error(
1230 StreamErrorCode::SnapshotNotFound,
1231 format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
1232 ),
1233 Err(err) => err,
1234 }
1235 }
1236
1237 pub fn bootstrap_plan(
1238 &self,
1239 stream_id: &BucketStreamId,
1240 ) -> Result<StreamBootstrapPlan, StreamResponse> {
1241 let Some(stream) = self.streams.get(stream_id) else {
1242 return Err(StreamResponse::error(
1243 StreamErrorCode::StreamNotFound,
1244 format!("stream '{stream_id}' does not exist"),
1245 ));
1246 };
1247 if is_soft_deleted(stream) {
1248 return Err(StreamResponse::error(
1249 StreamErrorCode::StreamGone,
1250 format!("stream '{stream_id}' is gone"),
1251 ));
1252 }
1253 let snapshot = self.visible_snapshots.get(stream_id).cloned();
1254 let retained_offset = snapshot
1255 .as_ref()
1256 .map(|snapshot| snapshot.offset)
1257 .unwrap_or(0);
1258 let updates = self
1259 .message_records
1260 .get(stream_id)
1261 .map(|records| {
1262 records
1263 .iter()
1264 .filter(|record| record.start_offset >= retained_offset)
1265 .cloned()
1266 .collect::<Vec<_>>()
1267 })
1268 .unwrap_or_default();
1269 Ok(StreamBootstrapPlan {
1270 snapshot,
1271 updates,
1272 next_offset: stream.tail_offset,
1273 content_type: stream.content_type.clone(),
1274 up_to_date: true,
1275 closed: stream.status == StreamStatus::Closed,
1276 })
1277 }
1278
1279 fn publish_snapshot(
1280 &mut self,
1281 stream_id: BucketStreamId,
1282 snapshot_offset: u64,
1283 content_type: String,
1284 payload: Vec<u8>,
1285 now_ms: u64,
1286 ) -> StreamResponse {
1287 if let Err(response) = self.validate_stream_scope(&stream_id) {
1288 return response;
1289 }
1290 if content_type.trim().is_empty() {
1291 return StreamResponse::error(
1292 StreamErrorCode::InvalidSnapshot,
1293 "snapshot content type must not be empty",
1294 );
1295 }
1296 let Some(stream) = self.streams.get(&stream_id) else {
1297 return StreamResponse::error(
1298 StreamErrorCode::StreamNotFound,
1299 format!("stream '{stream_id}' does not exist"),
1300 );
1301 };
1302 if is_soft_deleted(stream) {
1303 return StreamResponse::error(
1304 StreamErrorCode::StreamGone,
1305 format!("stream '{stream_id}' is gone"),
1306 );
1307 }
1308 if stream_is_expired(stream, now_ms) {
1309 self.remove_stream_state(&stream_id);
1310 return StreamResponse::error(
1311 StreamErrorCode::StreamNotFound,
1312 format!("stream '{stream_id}' does not exist"),
1313 );
1314 }
1315 let tail_offset = stream.tail_offset;
1316 let retained_offset = self.earliest_retained_offset(&stream_id);
1317 if snapshot_offset < retained_offset {
1318 return StreamResponse::error_with_next_offset(
1319 StreamErrorCode::StreamGone,
1320 format!(
1321 "snapshot offset {snapshot_offset} is older than stream '{}' retained offset {retained_offset}",
1322 stream_id
1323 ),
1324 retained_offset,
1325 );
1326 }
1327 if snapshot_offset > tail_offset {
1328 return StreamResponse::error_with_next_offset(
1329 StreamErrorCode::SnapshotConflict,
1330 format!(
1331 "snapshot offset {snapshot_offset} is beyond stream '{}' tail {tail_offset}",
1332 stream_id
1333 ),
1334 tail_offset,
1335 );
1336 }
1337 if !self.snapshot_offset_aligned(&stream_id, snapshot_offset, retained_offset) {
1338 return StreamResponse::error_with_next_offset(
1339 StreamErrorCode::InvalidSnapshot,
1340 format!(
1341 "snapshot offset {snapshot_offset} is not aligned to a committed message boundary for stream '{stream_id}'"
1342 ),
1343 tail_offset,
1344 );
1345 }
1346
1347 self.visible_snapshots.insert(
1348 stream_id.clone(),
1349 StreamVisibleSnapshot {
1350 offset: snapshot_offset,
1351 content_type,
1352 payload,
1353 },
1354 );
1355 self.compact_retained_prefix(&stream_id, snapshot_offset);
1356 StreamResponse::SnapshotPublished { snapshot_offset }
1357 }
1358
1359 fn flush_cold(&mut self, stream_id: BucketStreamId, chunk: ColdChunkRef) -> StreamResponse {
1360 if let Err(response) = self.validate_stream_scope(&stream_id) {
1361 return response;
1362 }
1363 if chunk.s3_path.trim().is_empty() {
1364 return StreamResponse::error(
1365 StreamErrorCode::InvalidColdFlush,
1366 "cold chunk S3 path must not be empty",
1367 );
1368 }
1369 if chunk.object_size == 0 {
1370 return StreamResponse::error(
1371 StreamErrorCode::InvalidColdFlush,
1372 "cold chunk object size must be greater than zero",
1373 );
1374 }
1375 let Some(stream) = self.streams.get(&stream_id) else {
1376 return StreamResponse::error(
1377 StreamErrorCode::StreamNotFound,
1378 format!("stream '{stream_id}' does not exist"),
1379 );
1380 };
1381 if is_soft_deleted(stream) {
1382 return StreamResponse::error(
1383 StreamErrorCode::StreamGone,
1384 format!("stream '{stream_id}' is gone"),
1385 );
1386 }
1387 if chunk.end_offset <= chunk.start_offset {
1388 return StreamResponse::error_with_next_offset(
1389 StreamErrorCode::InvalidColdFlush,
1390 "cold chunk must cover at least one byte",
1391 stream.tail_offset,
1392 );
1393 }
1394 if chunk.end_offset > stream.tail_offset {
1395 return StreamResponse::error_with_next_offset(
1396 StreamErrorCode::InvalidColdFlush,
1397 format!(
1398 "cold chunk end {} is beyond stream '{}' tail {}",
1399 chunk.end_offset, stream_id, stream.tail_offset
1400 ),
1401 stream.tail_offset,
1402 );
1403 }
1404 let segments = self.hot_segments(&stream_id);
1405 let Some(segment_index) = segments
1406 .iter()
1407 .position(|segment| segment.start_offset == chunk.start_offset)
1408 else {
1409 return StreamResponse::error_with_next_offset(
1410 StreamErrorCode::InvalidColdFlush,
1411 format!(
1412 "cold chunk for stream '{stream_id}' does not match the start of a hot payload segment"
1413 ),
1414 stream.tail_offset,
1415 );
1416 };
1417
1418 let drain_start = segments[segment_index].payload_start;
1419 let mut covered_offset = chunk.start_offset;
1420 let mut flush_len = 0usize;
1421 for segment in segments.iter().skip(segment_index) {
1422 if segment.start_offset != covered_offset {
1423 break;
1424 }
1425 let segment_cover_end = chunk.end_offset.min(segment.end_offset);
1426 let segment_flush_len = match usize::try_from(segment_cover_end - segment.start_offset)
1427 {
1428 Ok(segment_flush_len) => segment_flush_len,
1429 Err(_) => {
1430 return StreamResponse::error_with_next_offset(
1431 StreamErrorCode::InvalidColdFlush,
1432 "cold chunk length does not fit in memory",
1433 stream.tail_offset,
1434 );
1435 }
1436 };
1437 let Some(expected_payload_start) = drain_start.checked_add(flush_len) else {
1438 return StreamResponse::error_with_next_offset(
1439 StreamErrorCode::InvalidColdFlush,
1440 "cold chunk length does not fit in memory",
1441 stream.tail_offset,
1442 );
1443 };
1444 if segment.payload_start != expected_payload_start {
1445 return StreamResponse::error_with_next_offset(
1446 StreamErrorCode::InvalidColdFlush,
1447 format!("stream '{stream_id}' has non-contiguous hot payload metadata"),
1448 stream.tail_offset,
1449 );
1450 }
1451 let segment_payload_len = segment.payload_end - segment.payload_start;
1452 if segment_flush_len > segment_payload_len {
1453 return StreamResponse::error_with_next_offset(
1454 StreamErrorCode::InvalidColdFlush,
1455 format!("cold chunk length exceeds stream '{stream_id}' hot segment metadata"),
1456 stream.tail_offset,
1457 );
1458 }
1459 let Some(new_flush_len) = flush_len.checked_add(segment_flush_len) else {
1460 return StreamResponse::error_with_next_offset(
1461 StreamErrorCode::InvalidColdFlush,
1462 "cold chunk length does not fit in memory",
1463 stream.tail_offset,
1464 );
1465 };
1466 flush_len = new_flush_len;
1467 covered_offset = segment_cover_end;
1468 if covered_offset == chunk.end_offset {
1469 break;
1470 }
1471 }
1472 if covered_offset != chunk.end_offset {
1473 return StreamResponse::error_with_next_offset(
1474 StreamErrorCode::InvalidColdFlush,
1475 format!(
1476 "cold chunk for stream '{stream_id}' does not cover contiguous hot payload segments"
1477 ),
1478 stream.tail_offset,
1479 );
1480 }
1481 let Some(drain_end) = drain_start.checked_add(flush_len) else {
1482 return StreamResponse::error_with_next_offset(
1483 StreamErrorCode::InvalidColdFlush,
1484 "cold chunk length does not fit in memory",
1485 stream.tail_offset,
1486 );
1487 };
1488 let payload_len = self
1489 .payloads
1490 .get(&stream_id)
1491 .expect("payload vector exists for stream metadata")
1492 .len();
1493 if drain_end > payload_len {
1494 return StreamResponse::error_with_next_offset(
1495 StreamErrorCode::InvalidColdFlush,
1496 format!("cold chunk length exceeds stream '{stream_id}' hot payload length"),
1497 stream.tail_offset,
1498 );
1499 };
1500
1501 self.payloads
1502 .get_mut(&stream_id)
1503 .expect("payload vector exists for stream metadata")
1504 .drain(drain_start..drain_end);
1505 self.remove_drained_hot_range(
1506 &stream_id,
1507 segment_index,
1508 chunk.end_offset,
1509 drain_start,
1510 flush_len,
1511 );
1512 self.cold_chunks
1513 .entry(stream_id.clone())
1514 .or_default()
1515 .push(chunk.clone());
1516 self.refresh_hot_start_offset(&stream_id);
1517 StreamResponse::ColdFlushed {
1518 hot_start_offset: self.hot_start_offset(&stream_id),
1519 }
1520 }
1521
1522 fn create_bucket(&mut self, bucket_id: String) -> StreamResponse {
1523 if let Err(message) = validate_bucket_id(&bucket_id) {
1524 return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1525 }
1526 if !self.buckets.insert(bucket_id.clone()) {
1527 return StreamResponse::BucketAlreadyExists { bucket_id };
1528 }
1529 StreamResponse::BucketCreated { bucket_id }
1530 }
1531
1532 fn delete_bucket(&mut self, bucket_id: &str) -> StreamResponse {
1533 if let Err(message) = validate_bucket_id(bucket_id) {
1534 return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1535 }
1536 if !self.buckets.contains(bucket_id) {
1537 return StreamResponse::error(
1538 StreamErrorCode::BucketNotFound,
1539 format!("bucket '{bucket_id}' does not exist"),
1540 );
1541 }
1542 if self
1543 .streams
1544 .keys()
1545 .any(|stream_id| stream_id.bucket_id == bucket_id)
1546 {
1547 return StreamResponse::error(
1548 StreamErrorCode::BucketNotEmpty,
1549 format!("bucket '{bucket_id}' is not empty"),
1550 );
1551 }
1552 self.buckets.remove(bucket_id);
1553 StreamResponse::BucketDeleted {
1554 bucket_id: bucket_id.to_owned(),
1555 }
1556 }
1557
1558 fn create_stream(&mut self, input: CreateStreamInput) -> StreamResponse {
1559 if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1560 return response;
1561 }
1562 if let Err(response) =
1563 validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1564 {
1565 return response;
1566 }
1567 if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1568 return response;
1569 }
1570 if let Some(producer) = input.producer.as_ref()
1571 && producer.producer_seq != 0
1572 {
1573 return StreamResponse::error(
1574 StreamErrorCode::ProducerSeqConflict,
1575 format!(
1576 "producer '{}' expected sequence 0, received {}",
1577 producer.producer_id, producer.producer_seq
1578 ),
1579 );
1580 }
1581 if self
1582 .streams
1583 .get(&input.stream_id)
1584 .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1585 {
1586 self.remove_stream_state(&input.stream_id);
1587 }
1588
1589 if let Some(existing) = self.streams.get(&input.stream_id) {
1590 if is_soft_deleted(existing) {
1591 return StreamResponse::error(
1592 StreamErrorCode::StreamAlreadyExistsConflict,
1593 format!(
1594 "stream '{}' is gone and cannot be recreated yet",
1595 input.stream_id
1596 ),
1597 );
1598 }
1599 if existing.content_type == input.content_type
1600 && existing.status == status_from_closed(input.close_after)
1601 && existing.stream_ttl_seconds == input.stream_ttl_seconds
1602 && existing.stream_expires_at_ms == input.stream_expires_at_ms
1603 && existing.forked_from == input.forked_from
1604 && existing.fork_offset == input.fork_offset
1605 {
1606 return StreamResponse::AlreadyExists {
1607 next_offset: existing.tail_offset,
1608 closed: existing.status == StreamStatus::Closed,
1609 content_type: existing.content_type.clone(),
1610 stream_ttl_seconds: existing.stream_ttl_seconds,
1611 stream_expires_at_ms: existing.stream_expires_at_ms,
1612 };
1613 }
1614 return StreamResponse::error(
1615 StreamErrorCode::StreamAlreadyExistsConflict,
1616 format!(
1617 "stream '{}' already exists with different metadata",
1618 input.stream_id
1619 ),
1620 );
1621 }
1622
1623 let initial_len = input.initial_len();
1624 let metadata = StreamMetadata {
1625 stream_id: input.stream_id.clone(),
1626 content_type: input.content_type,
1627 status: status_from_closed(input.close_after),
1628 tail_offset: initial_len,
1629 last_stream_seq: input.stream_seq,
1630 stream_ttl_seconds: input.stream_ttl_seconds,
1631 stream_expires_at_ms: input.stream_expires_at_ms,
1632 created_at_ms: input.now_ms,
1633 last_ttl_touch_at_ms: input.now_ms,
1634 forked_from: input.forked_from,
1635 fork_offset: input.fork_offset,
1636 fork_ref_count: 0,
1637 };
1638 self.streams.insert(input.stream_id.clone(), metadata);
1639 self.payloads
1640 .insert(input.stream_id.clone(), input.initial_payload);
1641 if initial_len > 0 {
1642 self.hot_segments.insert(
1643 input.stream_id.clone(),
1644 vec![HotPayloadSegment {
1645 start_offset: 0,
1646 end_offset: initial_len,
1647 payload_start: 0,
1648 payload_end: usize::try_from(initial_len).expect("payload len fits usize"),
1649 }],
1650 );
1651 self.message_records.insert(
1652 input.stream_id.clone(),
1653 vec![StreamMessageRecord {
1654 start_offset: 0,
1655 end_offset: initial_len,
1656 }],
1657 );
1658 }
1659 let mut producer_states = HashMap::new();
1660 if let Some(producer) = input.producer {
1661 let last_item = ProducerAppendRecord {
1662 start_offset: 0,
1663 next_offset: initial_len,
1664 closed: input.close_after,
1665 };
1666 producer_states.insert(
1667 producer.producer_id,
1668 ProducerState {
1669 producer_epoch: producer.producer_epoch,
1670 producer_seq: producer.producer_seq,
1671 last_start_offset: last_item.start_offset,
1672 last_next_offset: last_item.next_offset,
1673 last_closed: last_item.closed,
1674 last_items: vec![last_item],
1675 },
1676 );
1677 }
1678 self.producers
1679 .insert(input.stream_id.clone(), producer_states);
1680 StreamResponse::Created {
1681 stream_id: input.stream_id,
1682 next_offset: initial_len,
1683 closed: input.close_after,
1684 }
1685 }
1686
1687 fn create_external_stream(&mut self, input: CreateExternalStreamInput) -> StreamResponse {
1688 if let Err(response) = validate_external_payload_ref(&input.initial_payload) {
1689 return response;
1690 }
1691 if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1692 return response;
1693 }
1694 if let Err(response) =
1695 validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1696 {
1697 return response;
1698 }
1699 if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1700 return response;
1701 }
1702 if let Some(producer) = input.producer.as_ref()
1703 && producer.producer_seq != 0
1704 {
1705 return StreamResponse::error(
1706 StreamErrorCode::ProducerSeqConflict,
1707 format!(
1708 "producer '{}' expected sequence 0, received {}",
1709 producer.producer_id, producer.producer_seq
1710 ),
1711 );
1712 }
1713 if self
1714 .streams
1715 .get(&input.stream_id)
1716 .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1717 {
1718 self.remove_stream_state(&input.stream_id);
1719 }
1720
1721 if let Some(existing) = self.streams.get(&input.stream_id) {
1722 if is_soft_deleted(existing) {
1723 return StreamResponse::error(
1724 StreamErrorCode::StreamAlreadyExistsConflict,
1725 format!(
1726 "stream '{}' is gone and cannot be recreated yet",
1727 input.stream_id
1728 ),
1729 );
1730 }
1731 if existing.content_type == input.content_type
1732 && existing.status == status_from_closed(input.close_after)
1733 && existing.stream_ttl_seconds == input.stream_ttl_seconds
1734 && existing.stream_expires_at_ms == input.stream_expires_at_ms
1735 && existing.forked_from == input.forked_from
1736 && existing.fork_offset == input.fork_offset
1737 {
1738 return StreamResponse::AlreadyExists {
1739 next_offset: existing.tail_offset,
1740 closed: existing.status == StreamStatus::Closed,
1741 content_type: existing.content_type.clone(),
1742 stream_ttl_seconds: existing.stream_ttl_seconds,
1743 stream_expires_at_ms: existing.stream_expires_at_ms,
1744 };
1745 }
1746 return StreamResponse::error(
1747 StreamErrorCode::StreamAlreadyExistsConflict,
1748 format!(
1749 "stream '{}' already exists with different metadata",
1750 input.stream_id
1751 ),
1752 );
1753 }
1754
1755 let initial_len = input.initial_payload.payload_len;
1756 let metadata = StreamMetadata {
1757 stream_id: input.stream_id.clone(),
1758 content_type: input.content_type,
1759 status: status_from_closed(input.close_after),
1760 tail_offset: initial_len,
1761 last_stream_seq: input.stream_seq,
1762 stream_ttl_seconds: input.stream_ttl_seconds,
1763 stream_expires_at_ms: input.stream_expires_at_ms,
1764 created_at_ms: input.now_ms,
1765 last_ttl_touch_at_ms: input.now_ms,
1766 forked_from: input.forked_from,
1767 fork_offset: input.fork_offset,
1768 fork_ref_count: 0,
1769 };
1770 self.streams.insert(input.stream_id.clone(), metadata);
1771 self.payloads.insert(input.stream_id.clone(), Vec::new());
1772 self.external_segments.insert(
1773 input.stream_id.clone(),
1774 vec![ObjectPayloadRef {
1775 start_offset: 0,
1776 end_offset: initial_len,
1777 s3_path: input.initial_payload.s3_path,
1778 object_size: input.initial_payload.object_size,
1779 }],
1780 );
1781 self.message_records.insert(
1782 input.stream_id.clone(),
1783 vec![StreamMessageRecord {
1784 start_offset: 0,
1785 end_offset: initial_len,
1786 }],
1787 );
1788 let mut producer_states = HashMap::new();
1789 if let Some(producer) = input.producer {
1790 let last_item = ProducerAppendRecord {
1791 start_offset: 0,
1792 next_offset: initial_len,
1793 closed: input.close_after,
1794 };
1795 producer_states.insert(
1796 producer.producer_id,
1797 ProducerState {
1798 producer_epoch: producer.producer_epoch,
1799 producer_seq: producer.producer_seq,
1800 last_start_offset: last_item.start_offset,
1801 last_next_offset: last_item.next_offset,
1802 last_closed: last_item.closed,
1803 last_items: vec![last_item],
1804 },
1805 );
1806 }
1807 self.producers
1808 .insert(input.stream_id.clone(), producer_states);
1809 StreamResponse::Created {
1810 stream_id: input.stream_id,
1811 next_offset: initial_len,
1812 closed: input.close_after,
1813 }
1814 }
1815
1816 pub fn append_borrowed(&mut self, input: AppendStreamInput<'_>) -> StreamResponse {
1817 let AppendStreamInput {
1818 stream_id,
1819 content_type,
1820 payload,
1821 close_after,
1822 stream_seq,
1823 producer,
1824 now_ms,
1825 } = input;
1826 if let Err(response) = self.validate_stream_scope(&stream_id) {
1827 return response;
1828 }
1829 if let Err(response) = validate_producer_request(producer.as_ref()) {
1830 return response;
1831 }
1832
1833 let Some(_) = self.streams.get(&stream_id) else {
1834 return StreamResponse::error(
1835 StreamErrorCode::StreamNotFound,
1836 format!("stream '{stream_id}' does not exist"),
1837 );
1838 };
1839 if self.expire_stream_if_due(&stream_id, now_ms) {
1840 return StreamResponse::error(
1841 StreamErrorCode::StreamNotFound,
1842 format!("stream '{stream_id}' does not exist"),
1843 );
1844 }
1845 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1846 return StreamResponse::error(
1847 StreamErrorCode::StreamGone,
1848 format!("stream '{stream_id}' is gone"),
1849 );
1850 }
1851 let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1852 Ok(decision) => decision,
1853 Err(response) => return response,
1854 };
1855 if let ProducerDecision::Duplicate {
1856 offset,
1857 next_offset,
1858 closed,
1859 producer,
1860 ..
1861 } = producer_decision
1862 {
1863 if payload.is_empty() {
1864 return StreamResponse::Closed {
1865 next_offset,
1866 deduplicated: true,
1867 producer: Some(producer),
1868 };
1869 }
1870 return StreamResponse::Appended {
1871 offset,
1872 next_offset,
1873 closed,
1874 deduplicated: true,
1875 producer: Some(producer),
1876 };
1877 }
1878
1879 let Some(stream) = self.streams.get_mut(&stream_id) else {
1880 unreachable!("stream existence checked before producer evaluation");
1881 };
1882
1883 if stream.status == StreamStatus::Closed {
1884 if close_after && payload.is_empty() {
1885 return StreamResponse::Closed {
1886 next_offset: stream.tail_offset,
1887 deduplicated: false,
1888 producer: None,
1889 };
1890 }
1891 return StreamResponse::error_with_next_offset(
1892 StreamErrorCode::StreamClosed,
1893 format!("stream '{stream_id}' is closed"),
1894 stream.tail_offset,
1895 );
1896 }
1897
1898 if payload.is_empty() && !close_after {
1899 return StreamResponse::error(
1900 StreamErrorCode::EmptyAppend,
1901 "append payload must be non-empty unless closing the stream",
1902 );
1903 }
1904
1905 if !payload.is_empty() {
1906 let Some(content_type) = content_type else {
1907 return StreamResponse::error(
1908 StreamErrorCode::MissingContentType,
1909 "append with a body must include content type",
1910 );
1911 };
1912 if content_type != stream.content_type {
1913 return StreamResponse::error_with_next_offset(
1914 StreamErrorCode::ContentTypeMismatch,
1915 format!(
1916 "append content type '{content_type}' does not match stream content type '{}'",
1917 stream.content_type
1918 ),
1919 stream.tail_offset,
1920 );
1921 }
1922 }
1923
1924 if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1925 return response;
1926 }
1927
1928 let offset = stream.tail_offset;
1929 let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1930 stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1931 if let Some(seq) = stream_seq {
1932 stream.last_stream_seq = Some(seq);
1933 }
1934 renew_stream_ttl(stream, now_ms);
1935 if close_after {
1936 stream.status = StreamStatus::Closed;
1937 }
1938 let closed = stream.status == StreamStatus::Closed;
1939 let next_offset = stream.tail_offset;
1940 let producer_ack = producer.clone();
1941 if let Some(producer) = producer {
1942 self.record_producer_success(
1943 stream_id.clone(),
1944 producer,
1945 ProducerAppendRecord {
1946 start_offset: offset,
1947 next_offset,
1948 closed,
1949 },
1950 vec![ProducerAppendRecord {
1951 start_offset: offset,
1952 next_offset,
1953 closed,
1954 }],
1955 );
1956 }
1957
1958 if payload.is_empty() {
1959 StreamResponse::Closed {
1960 next_offset,
1961 deduplicated: false,
1962 producer: producer_ack,
1963 }
1964 } else {
1965 let payload_store = self
1966 .payloads
1967 .get_mut(&stream_id)
1968 .expect("payload vector exists for stream metadata");
1969 let payload_start = payload_store.len();
1970 payload_store.extend_from_slice(payload);
1971 let payload_end = payload_store.len();
1972 self.hot_segments
1973 .get_mut(&stream_id)
1974 .map(|segments| {
1975 segments.push(HotPayloadSegment {
1976 start_offset: offset,
1977 end_offset: next_offset,
1978 payload_start,
1979 payload_end,
1980 })
1981 })
1982 .unwrap_or_else(|| {
1983 self.hot_segments.insert(
1984 stream_id.clone(),
1985 vec![HotPayloadSegment {
1986 start_offset: offset,
1987 end_offset: next_offset,
1988 payload_start,
1989 payload_end,
1990 }],
1991 );
1992 });
1993 self.refresh_hot_start_offset(&stream_id);
1994 self.message_records
1995 .entry(stream_id.clone())
1996 .or_default()
1997 .push(StreamMessageRecord {
1998 start_offset: offset,
1999 end_offset: next_offset,
2000 });
2001 StreamResponse::Appended {
2002 offset,
2003 next_offset,
2004 closed: close_after,
2005 deduplicated: false,
2006 producer: producer_ack,
2007 }
2008 }
2009 }
2010
2011 fn append_external(&mut self, input: AppendExternalInput<'_>) -> StreamResponse {
2012 let AppendExternalInput {
2013 stream_id,
2014 content_type,
2015 payload,
2016 close_after,
2017 stream_seq,
2018 producer,
2019 now_ms,
2020 } = input;
2021 if let Err(response) = validate_external_payload_ref(&payload) {
2022 return response;
2023 }
2024 if let Err(response) = self.validate_stream_scope(&stream_id) {
2025 return response;
2026 }
2027 if let Err(response) = validate_producer_request(producer.as_ref()) {
2028 return response;
2029 }
2030 let Some(_) = self.streams.get(&stream_id) else {
2031 return StreamResponse::error(
2032 StreamErrorCode::StreamNotFound,
2033 format!("stream '{stream_id}' does not exist"),
2034 );
2035 };
2036 if self.expire_stream_if_due(&stream_id, now_ms) {
2037 return StreamResponse::error(
2038 StreamErrorCode::StreamNotFound,
2039 format!("stream '{stream_id}' does not exist"),
2040 );
2041 }
2042 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
2043 return StreamResponse::error(
2044 StreamErrorCode::StreamGone,
2045 format!("stream '{stream_id}' is gone"),
2046 );
2047 }
2048 let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
2049 Ok(decision) => decision,
2050 Err(response) => return response,
2051 };
2052 if let ProducerDecision::Duplicate {
2053 offset,
2054 next_offset,
2055 closed,
2056 producer,
2057 ..
2058 } = producer_decision
2059 {
2060 return StreamResponse::Appended {
2061 offset,
2062 next_offset,
2063 closed,
2064 deduplicated: true,
2065 producer: Some(producer),
2066 };
2067 }
2068
2069 let Some(stream) = self.streams.get(&stream_id) else {
2070 unreachable!("stream existence checked before producer evaluation");
2071 };
2072 if stream.status == StreamStatus::Closed {
2073 return StreamResponse::error_with_next_offset(
2074 StreamErrorCode::StreamClosed,
2075 format!("stream '{stream_id}' is closed"),
2076 stream.tail_offset,
2077 );
2078 }
2079 let Some(content_type) = content_type else {
2080 return StreamResponse::error(
2081 StreamErrorCode::MissingContentType,
2082 "append with a body must include content type",
2083 );
2084 };
2085 if content_type != stream.content_type {
2086 return StreamResponse::error_with_next_offset(
2087 StreamErrorCode::ContentTypeMismatch,
2088 format!(
2089 "append content type '{content_type}' does not match stream content type '{}'",
2090 stream.content_type
2091 ),
2092 stream.tail_offset,
2093 );
2094 }
2095 if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
2096 return response;
2097 }
2098 let offset = stream.tail_offset;
2099 let next_offset = offset.saturating_add(payload.payload_len);
2100 let stream = self
2101 .streams
2102 .get_mut(&stream_id)
2103 .expect("stream existence checked before external append mutation");
2104 stream.tail_offset = next_offset;
2105 if let Some(seq) = stream_seq {
2106 stream.last_stream_seq = Some(seq);
2107 }
2108 renew_stream_ttl(stream, now_ms);
2109 if close_after {
2110 stream.status = StreamStatus::Closed;
2111 }
2112 let closed = stream.status == StreamStatus::Closed;
2113 let producer_ack = producer.clone();
2114 if let Some(producer) = producer {
2115 self.record_producer_success(
2116 stream_id.clone(),
2117 producer,
2118 ProducerAppendRecord {
2119 start_offset: offset,
2120 next_offset,
2121 closed,
2122 },
2123 vec![ProducerAppendRecord {
2124 start_offset: offset,
2125 next_offset,
2126 closed,
2127 }],
2128 );
2129 }
2130 self.external_segments
2131 .entry(stream_id.clone())
2132 .or_default()
2133 .push(ObjectPayloadRef {
2134 start_offset: offset,
2135 end_offset: next_offset,
2136 s3_path: payload.s3_path,
2137 object_size: payload.object_size,
2138 });
2139 self.message_records
2140 .entry(stream_id.clone())
2141 .or_default()
2142 .push(StreamMessageRecord {
2143 start_offset: offset,
2144 end_offset: next_offset,
2145 });
2146 StreamResponse::Appended {
2147 offset,
2148 next_offset,
2149 closed: close_after,
2150 deduplicated: false,
2151 producer: producer_ack,
2152 }
2153 }
2154
2155 pub fn append_batch_borrowed(
2156 &mut self,
2157 stream_id: BucketStreamId,
2158 content_type: Option<&str>,
2159 payloads: &[&[u8]],
2160 producer: Option<ProducerRequest>,
2161 now_ms: u64,
2162 ) -> Result<StreamBatchAppend, StreamResponse> {
2163 if payloads.is_empty() {
2164 return Err(StreamResponse::error(
2165 StreamErrorCode::EmptyAppend,
2166 "append batch must contain at least one payload",
2167 ));
2168 }
2169 self.validate_stream_scope(&stream_id)?;
2170 validate_producer_request(producer.as_ref())?;
2171 if self.expire_stream_if_due(&stream_id, now_ms) {
2172 return Err(StreamResponse::error(
2173 StreamErrorCode::StreamNotFound,
2174 format!("stream '{stream_id}' does not exist"),
2175 ));
2176 }
2177 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
2178 return Err(StreamResponse::error(
2179 StreamErrorCode::StreamGone,
2180 format!("stream '{stream_id}' is gone"),
2181 ));
2182 }
2183 let producer_decision = self.evaluate_producer(&stream_id, producer.as_ref())?;
2184 if let ProducerDecision::Duplicate { items, .. } = producer_decision {
2185 return Ok(StreamBatchAppend {
2186 items: items
2187 .into_iter()
2188 .map(|item| StreamBatchAppendItem {
2189 offset: item.start_offset,
2190 next_offset: item.next_offset,
2191 closed: item.closed,
2192 deduplicated: true,
2193 })
2194 .collect(),
2195 deduplicated: true,
2196 });
2197 }
2198
2199 let Some(stream) = self.streams.get_mut(&stream_id) else {
2200 return Err(StreamResponse::error(
2201 StreamErrorCode::StreamNotFound,
2202 format!("stream '{stream_id}' does not exist"),
2203 ));
2204 };
2205 if stream.status == StreamStatus::Closed {
2206 return Err(StreamResponse::error_with_next_offset(
2207 StreamErrorCode::StreamClosed,
2208 format!("stream '{stream_id}' is closed"),
2209 stream.tail_offset,
2210 ));
2211 }
2212 let Some(content_type) = content_type else {
2213 return Err(StreamResponse::error(
2214 StreamErrorCode::MissingContentType,
2215 "append batch must include content type",
2216 ));
2217 };
2218 if content_type != stream.content_type {
2219 return Err(StreamResponse::error_with_next_offset(
2220 StreamErrorCode::ContentTypeMismatch,
2221 format!(
2222 "append content type '{content_type}' does not match stream content type '{}'",
2223 stream.content_type
2224 ),
2225 stream.tail_offset,
2226 ));
2227 }
2228 if payloads.iter().any(|payload| payload.is_empty()) {
2229 return Err(StreamResponse::error(
2230 StreamErrorCode::EmptyAppend,
2231 "append batch payloads must be non-empty",
2232 ));
2233 }
2234
2235 let mut items = Vec::with_capacity(payloads.len());
2236 for payload in payloads {
2237 let offset = stream.tail_offset;
2238 let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
2239 stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
2240 items.push(ProducerAppendRecord {
2241 start_offset: offset,
2242 next_offset: stream.tail_offset,
2243 closed: false,
2244 });
2245 }
2246 let last = items
2247 .last()
2248 .expect("payloads checked non-empty before append")
2249 .clone();
2250 renew_stream_ttl(stream, now_ms);
2251 if let Some(producer) = producer {
2252 self.record_producer_success(stream_id.clone(), producer, last.clone(), items.clone());
2253 }
2254 let payload_store = self
2255 .payloads
2256 .get_mut(&stream_id)
2257 .expect("payload vector exists for stream metadata");
2258 let hot_segments = self.hot_segments.entry(stream_id.clone()).or_default();
2259 for (item, payload) in items.iter().zip(payloads.iter()) {
2260 let payload_start = payload_store.len();
2261 payload_store.extend_from_slice(payload);
2262 let payload_end = payload_store.len();
2263 hot_segments.push(HotPayloadSegment {
2264 start_offset: item.start_offset,
2265 end_offset: item.next_offset,
2266 payload_start,
2267 payload_end,
2268 });
2269 }
2270 self.refresh_hot_start_offset(&stream_id);
2271 self.message_records
2272 .entry(stream_id.clone())
2273 .or_default()
2274 .extend(items.iter().map(|item| StreamMessageRecord {
2275 start_offset: item.start_offset,
2276 end_offset: item.next_offset,
2277 }));
2278 Ok(StreamBatchAppend {
2279 items: items
2280 .into_iter()
2281 .map(|item| StreamBatchAppendItem {
2282 offset: item.start_offset,
2283 next_offset: item.next_offset,
2284 closed: item.closed,
2285 deduplicated: false,
2286 })
2287 .collect(),
2288 deduplicated: false,
2289 })
2290 }
2291
2292 fn close(
2293 &mut self,
2294 stream_id: BucketStreamId,
2295 stream_seq: Option<String>,
2296 producer: Option<ProducerRequest>,
2297 now_ms: u64,
2298 ) -> StreamResponse {
2299 self.append_borrowed(AppendStreamInput {
2300 stream_id,
2301 content_type: None,
2302 payload: &[],
2303 close_after: true,
2304 stream_seq,
2305 producer,
2306 now_ms,
2307 })
2308 }
2309
2310 fn delete_stream(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
2311 if let Err(response) = self.validate_stream_scope(stream_id) {
2312 return response;
2313 }
2314 let Some(stream) = self.streams.get_mut(stream_id) else {
2315 return StreamResponse::error(
2316 StreamErrorCode::StreamNotFound,
2317 format!("stream '{stream_id}' does not exist"),
2318 );
2319 };
2320 if is_soft_deleted(stream) {
2321 return StreamResponse::error(
2322 StreamErrorCode::StreamGone,
2323 format!("stream '{stream_id}' is gone"),
2324 );
2325 }
2326 if stream.fork_ref_count > 0 {
2327 stream.status = StreamStatus::SoftDeleted;
2328 return StreamResponse::Deleted {
2329 hard_deleted: false,
2330 parent_to_release: None,
2331 };
2332 }
2333 let parent_to_release = stream.forked_from.clone();
2334 self.remove_stream_state(stream_id);
2335 StreamResponse::Deleted {
2336 hard_deleted: true,
2337 parent_to_release,
2338 }
2339 }
2340
2341 fn add_fork_ref(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> StreamResponse {
2342 if let Err(response) = self.validate_stream_scope(stream_id) {
2343 return response;
2344 }
2345 if self.expire_stream_if_due(stream_id, now_ms) {
2346 return StreamResponse::error(
2347 StreamErrorCode::StreamNotFound,
2348 format!("stream '{stream_id}' does not exist"),
2349 );
2350 }
2351 let Some(stream) = self.streams.get_mut(stream_id) else {
2352 return StreamResponse::error(
2353 StreamErrorCode::StreamNotFound,
2354 format!("stream '{stream_id}' does not exist"),
2355 );
2356 };
2357 if is_soft_deleted(stream) {
2358 return StreamResponse::error(
2359 StreamErrorCode::StreamGone,
2360 format!("stream '{stream_id}' is gone"),
2361 );
2362 }
2363 stream.fork_ref_count = stream.fork_ref_count.saturating_add(1);
2364 StreamResponse::ForkRefAdded {
2365 fork_ref_count: stream.fork_ref_count,
2366 }
2367 }
2368
2369 fn release_fork_ref(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
2370 if let Err(response) = self.validate_stream_scope(stream_id) {
2371 return response;
2372 }
2373 let Some(stream) = self.streams.get_mut(stream_id) else {
2374 return StreamResponse::ForkRefReleased {
2375 hard_deleted: false,
2376 fork_ref_count: 0,
2377 parent_to_release: None,
2378 };
2379 };
2380 if stream.fork_ref_count == 0 {
2381 return StreamResponse::error(
2382 StreamErrorCode::InvalidFork,
2383 format!("stream '{stream_id}' has no fork reference to release"),
2384 );
2385 }
2386 stream.fork_ref_count -= 1;
2387 if stream.fork_ref_count == 0 && is_soft_deleted(stream) {
2388 let parent_to_release = stream.forked_from.clone();
2389 self.remove_stream_state(stream_id);
2390 return StreamResponse::ForkRefReleased {
2391 hard_deleted: true,
2392 fork_ref_count: 0,
2393 parent_to_release,
2394 };
2395 }
2396 StreamResponse::ForkRefReleased {
2397 hard_deleted: false,
2398 fork_ref_count: stream.fork_ref_count,
2399 parent_to_release: None,
2400 }
2401 }
2402
2403 fn touch_stream_access(
2404 &mut self,
2405 stream_id: &BucketStreamId,
2406 now_ms: u64,
2407 renew_ttl: bool,
2408 ) -> StreamResponse {
2409 if let Err(response) = self.validate_stream_scope(stream_id) {
2410 return response;
2411 }
2412 let Some(stream) = self.streams.get(stream_id) else {
2413 return StreamResponse::error(
2414 StreamErrorCode::StreamNotFound,
2415 format!("stream '{stream_id}' does not exist"),
2416 );
2417 };
2418 if is_soft_deleted(stream) {
2419 return StreamResponse::error(
2420 StreamErrorCode::StreamGone,
2421 format!("stream '{stream_id}' is gone"),
2422 );
2423 }
2424 if stream_is_expired(stream, now_ms) {
2425 self.remove_stream_state(stream_id);
2426 return StreamResponse::Accessed {
2427 changed: true,
2428 expired: true,
2429 };
2430 }
2431 let changed = if renew_ttl && stream.stream_ttl_seconds.is_some() {
2432 let stream = self
2433 .streams
2434 .get_mut(stream_id)
2435 .expect("stream existence checked before TTL renewal");
2436 let previous = stream.last_ttl_touch_at_ms;
2437 renew_stream_ttl(stream, now_ms);
2438 stream.last_ttl_touch_at_ms != previous
2439 } else {
2440 false
2441 };
2442 StreamResponse::Accessed {
2443 changed,
2444 expired: false,
2445 }
2446 }
2447
2448 fn expire_stream_if_due(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> bool {
2449 if self
2450 .streams
2451 .get(stream_id)
2452 .is_some_and(|stream| stream_is_expired(stream, now_ms))
2453 {
2454 self.remove_stream_state(stream_id);
2455 return true;
2456 }
2457 false
2458 }
2459
2460 fn remove_stream_state(&mut self, stream_id: &BucketStreamId) -> bool {
2461 if self.streams.remove(stream_id).is_some() {
2462 self.payloads.remove(stream_id);
2463 self.hot_segments.remove(stream_id);
2464 self.hot_start_offsets.remove(stream_id);
2465 self.cold_chunks.remove(stream_id);
2466 self.external_segments.remove(stream_id);
2467 self.message_records.remove(stream_id);
2468 self.visible_snapshots.remove(stream_id);
2469 self.producers.remove(stream_id);
2470 true
2471 } else {
2472 false
2473 }
2474 }
2475
2476 fn validate_stream_scope(&self, stream_id: &BucketStreamId) -> Result<(), StreamResponse> {
2477 if let Err(message) = validate_bucket_id(&stream_id.bucket_id) {
2478 return Err(StreamResponse::error(
2479 StreamErrorCode::InvalidBucketId,
2480 message,
2481 ));
2482 }
2483 if let Err(message) = validate_stream_id(stream_id) {
2484 return Err(StreamResponse::error(
2485 StreamErrorCode::InvalidStreamId,
2486 message,
2487 ));
2488 }
2489 if !self.buckets.contains(&stream_id.bucket_id) {
2490 return Err(StreamResponse::error(
2491 StreamErrorCode::BucketNotFound,
2492 format!("bucket '{}' does not exist", stream_id.bucket_id),
2493 ));
2494 }
2495 Ok(())
2496 }
2497
2498 fn earliest_retained_offset(&self, stream_id: &BucketStreamId) -> u64 {
2499 self.visible_snapshots
2500 .get(stream_id)
2501 .map(|snapshot| snapshot.offset)
2502 .unwrap_or(0)
2503 }
2504
2505 fn snapshot_offset_aligned(
2506 &self,
2507 stream_id: &BucketStreamId,
2508 snapshot_offset: u64,
2509 retained_offset: u64,
2510 ) -> bool {
2511 snapshot_offset == retained_offset
2512 || self.message_records.get(stream_id).is_some_and(|records| {
2513 records
2514 .iter()
2515 .any(|record| record.end_offset == snapshot_offset)
2516 })
2517 }
2518
2519 fn compact_retained_prefix(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2520 if let Some(records) = self.message_records.get_mut(stream_id) {
2521 records.retain(|record| record.end_offset > retained_offset);
2522 if records.is_empty() {
2523 self.message_records.remove(stream_id);
2524 }
2525 }
2526 if let Some(chunks) = self.cold_chunks.get_mut(stream_id) {
2527 chunks.retain(|chunk| chunk.end_offset > retained_offset);
2528 if chunks.is_empty() {
2529 self.cold_chunks.remove(stream_id);
2530 }
2531 }
2532 if let Some(objects) = self.external_segments.get_mut(stream_id) {
2533 objects.retain(|object| object.end_offset > retained_offset);
2534 if objects.is_empty() {
2535 self.external_segments.remove(stream_id);
2536 }
2537 }
2538
2539 self.discard_hot_prefix_before(stream_id, retained_offset);
2540 }
2541
2542 fn refresh_hot_start_offset(&mut self, stream_id: &BucketStreamId) {
2543 let Some(hot_start_offset) = self
2544 .hot_segments
2545 .get(stream_id)
2546 .and_then(|segments| segments.iter().map(|segment| segment.start_offset).min())
2547 else {
2548 self.hot_start_offsets.remove(stream_id);
2549 return;
2550 };
2551 if hot_start_offset == 0 {
2552 self.hot_start_offsets.remove(stream_id);
2553 } else {
2554 self.hot_start_offsets
2555 .insert(stream_id.clone(), hot_start_offset);
2556 }
2557 }
2558
2559 fn remove_drained_hot_range(
2560 &mut self,
2561 stream_id: &BucketStreamId,
2562 segment_index: usize,
2563 new_start_offset: u64,
2564 drain_start: usize,
2565 drained_len: usize,
2566 ) {
2567 let Some(segments) = self.hot_segments.get_mut(stream_id) else {
2568 self.hot_start_offsets.remove(stream_id);
2569 return;
2570 };
2571 if segment_index >= segments.len() {
2572 self.refresh_hot_start_offset(stream_id);
2573 return;
2574 }
2575 let drain_end = drain_start + drained_len;
2576 let mut updated_segments = Vec::with_capacity(segments.len());
2577 for (index, mut segment) in segments.drain(..).enumerate() {
2578 if index < segment_index || segment.payload_end <= drain_start {
2579 updated_segments.push(segment);
2580 continue;
2581 }
2582 if segment.payload_start >= drain_end {
2583 segment.payload_start -= drained_len;
2584 segment.payload_end -= drained_len;
2585 updated_segments.push(segment);
2586 continue;
2587 }
2588 if segment.payload_end <= drain_end {
2589 continue;
2590 }
2591 segment.start_offset = new_start_offset;
2592 segment.payload_start = drain_start;
2593 segment.payload_end -= drained_len;
2594 updated_segments.push(segment);
2595 }
2596 *segments = updated_segments;
2597 if segments.is_empty() {
2598 self.hot_segments.remove(stream_id);
2599 }
2600 self.refresh_hot_start_offset(stream_id);
2601 }
2602
2603 fn discard_hot_prefix_before(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2604 while let Some(segment_index) = self
2605 .hot_segments(stream_id)
2606 .iter()
2607 .position(|segment| segment.start_offset < retained_offset)
2608 {
2609 let segment = self.hot_segments(stream_id)[segment_index].clone();
2610 let new_start_offset = retained_offset.min(segment.end_offset);
2611 let drained_len = usize::try_from(new_start_offset - segment.start_offset)
2612 .expect("drain len fits usize");
2613 if drained_len == 0 {
2614 break;
2615 }
2616 let drain_start = segment.payload_start;
2617 let drain_end = drain_start + drained_len;
2618 self.payloads
2619 .get_mut(stream_id)
2620 .expect("payload vector exists for stream metadata")
2621 .drain(drain_start..drain_end);
2622 self.remove_drained_hot_range(
2623 stream_id,
2624 segment_index,
2625 new_start_offset,
2626 drain_start,
2627 drained_len,
2628 );
2629 }
2630 self.refresh_hot_start_offset(stream_id);
2631 }
2632
2633 fn producer_snapshot(&self, stream_id: &BucketStreamId) -> Vec<ProducerSnapshot> {
2634 let mut producer_states = self
2635 .producers
2636 .get(stream_id)
2637 .into_iter()
2638 .flat_map(|states| states.iter())
2639 .map(|(producer_id, state)| ProducerSnapshot {
2640 producer_id: producer_id.clone(),
2641 producer_epoch: state.producer_epoch,
2642 producer_seq: state.producer_seq,
2643 last_start_offset: state.last_start_offset,
2644 last_next_offset: state.last_next_offset,
2645 last_closed: state.last_closed,
2646 last_items: state.last_items.clone(),
2647 })
2648 .collect::<Vec<_>>();
2649 producer_states.sort_by(|left, right| left.producer_id.cmp(&right.producer_id));
2650 producer_states
2651 }
2652
2653 fn evaluate_producer(
2654 &self,
2655 stream_id: &BucketStreamId,
2656 producer: Option<&ProducerRequest>,
2657 ) -> Result<ProducerDecision, StreamResponse> {
2658 let Some(producer) = producer else {
2659 return Ok(ProducerDecision::Accept);
2660 };
2661 let Some(states) = self.producers.get(stream_id) else {
2662 return Ok(ProducerDecision::Accept);
2663 };
2664 let Some(state) = states.get(&producer.producer_id) else {
2665 if producer.producer_seq == 0 {
2666 return Ok(ProducerDecision::Accept);
2667 }
2668 return Err(StreamResponse::error(
2669 StreamErrorCode::ProducerSeqConflict,
2670 format!(
2671 "producer '{}' expected sequence 0, received {}",
2672 producer.producer_id, producer.producer_seq
2673 ),
2674 ));
2675 };
2676
2677 if producer.producer_epoch < state.producer_epoch {
2678 return Err(StreamResponse::error(
2679 StreamErrorCode::ProducerEpochStale,
2680 format!(
2681 "producer '{}' epoch {} is stale; current epoch is {}",
2682 producer.producer_id, producer.producer_epoch, state.producer_epoch
2683 ),
2684 ));
2685 }
2686 if producer.producer_epoch > state.producer_epoch {
2687 if producer.producer_seq == 0 {
2688 return Ok(ProducerDecision::Accept);
2689 }
2690 return Err(StreamResponse::error(
2691 StreamErrorCode::InvalidProducer,
2692 format!(
2693 "producer '{}' new epoch {} must start at sequence 0",
2694 producer.producer_id, producer.producer_epoch
2695 ),
2696 ));
2697 }
2698
2699 if producer.producer_seq <= state.producer_seq {
2700 return Ok(ProducerDecision::Duplicate {
2701 offset: state.last_start_offset,
2702 next_offset: state.last_next_offset,
2703 closed: state.last_closed,
2704 producer: ProducerRequest {
2705 producer_id: producer.producer_id.clone(),
2706 producer_epoch: state.producer_epoch,
2707 producer_seq: state.producer_seq,
2708 },
2709 items: state.last_items.clone(),
2710 });
2711 }
2712 if producer.producer_seq == state.producer_seq + 1 {
2713 return Ok(ProducerDecision::Accept);
2714 }
2715 Err(StreamResponse::error(
2716 StreamErrorCode::ProducerSeqConflict,
2717 format!(
2718 "producer '{}' expected sequence {}, received {}",
2719 producer.producer_id,
2720 state.producer_seq + 1,
2721 producer.producer_seq
2722 ),
2723 ))
2724 }
2725
2726 fn record_producer_success(
2727 &mut self,
2728 stream_id: BucketStreamId,
2729 producer: ProducerRequest,
2730 last: ProducerAppendRecord,
2731 last_items: Vec<ProducerAppendRecord>,
2732 ) {
2733 self.producers.entry(stream_id).or_default().insert(
2734 producer.producer_id,
2735 ProducerState {
2736 producer_epoch: producer.producer_epoch,
2737 producer_seq: producer.producer_seq,
2738 last_start_offset: last.start_offset,
2739 last_next_offset: last.next_offset,
2740 last_closed: last.closed,
2741 last_items,
2742 },
2743 );
2744 }
2745}
2746
2747#[derive(Debug)]
2748struct CreateStreamInput {
2749 stream_id: BucketStreamId,
2750 content_type: String,
2751 initial_payload: Vec<u8>,
2752 close_after: bool,
2753 stream_seq: Option<String>,
2754 producer: Option<ProducerRequest>,
2755 stream_ttl_seconds: Option<u64>,
2756 stream_expires_at_ms: Option<u64>,
2757 forked_from: Option<BucketStreamId>,
2758 fork_offset: Option<u64>,
2759 now_ms: u64,
2760}
2761
2762#[derive(Debug)]
2763struct CreateExternalStreamInput {
2764 stream_id: BucketStreamId,
2765 content_type: String,
2766 initial_payload: ExternalPayloadRef,
2767 close_after: bool,
2768 stream_seq: Option<String>,
2769 producer: Option<ProducerRequest>,
2770 stream_ttl_seconds: Option<u64>,
2771 stream_expires_at_ms: Option<u64>,
2772 forked_from: Option<BucketStreamId>,
2773 fork_offset: Option<u64>,
2774 now_ms: u64,
2775}
2776
2777#[derive(Debug, Clone, PartialEq, Eq)]
2778enum ProducerDecision {
2779 Accept,
2780 Duplicate {
2781 offset: u64,
2782 next_offset: u64,
2783 closed: bool,
2784 producer: ProducerRequest,
2785 items: Vec<ProducerAppendRecord>,
2786 },
2787}
2788
2789impl CreateStreamInput {
2790 fn initial_len(&self) -> u64 {
2791 u64::try_from(self.initial_payload.len()).expect("payload len fits u64")
2792 }
2793}
2794
2795fn status_from_closed(closed: bool) -> StreamStatus {
2796 if closed {
2797 StreamStatus::Closed
2798 } else {
2799 StreamStatus::Open
2800 }
2801}
2802
2803fn is_soft_deleted(stream: &StreamMetadata) -> bool {
2804 stream.status == StreamStatus::SoftDeleted
2805}
2806
2807fn validate_retention(
2808 stream_ttl_seconds: Option<u64>,
2809 stream_expires_at_ms: Option<u64>,
2810) -> Result<(), StreamResponse> {
2811 if stream_ttl_seconds.is_some() && stream_expires_at_ms.is_some() {
2812 return Err(StreamResponse::error(
2813 StreamErrorCode::InvalidRetention,
2814 "stream ttl and expires-at cannot both be set",
2815 ));
2816 }
2817 if let Some(ttl_seconds) = stream_ttl_seconds
2818 && ttl_seconds.checked_mul(1000).is_none()
2819 {
2820 return Err(StreamResponse::error(
2821 StreamErrorCode::InvalidRetention,
2822 "stream ttl overflows millisecond range",
2823 ));
2824 }
2825 Ok(())
2826}
2827
2828fn stream_expiry_at_ms(stream: &StreamMetadata) -> Option<u64> {
2829 if let Some(expires_at_ms) = stream.stream_expires_at_ms {
2830 return Some(expires_at_ms);
2831 }
2832 stream.stream_ttl_seconds.map(|ttl_seconds| {
2833 stream
2834 .last_ttl_touch_at_ms
2835 .saturating_add(ttl_seconds.saturating_mul(1000))
2836 })
2837}
2838
2839fn stream_is_expired(stream: &StreamMetadata, now_ms: u64) -> bool {
2840 stream_expiry_at_ms(stream).is_some_and(|expires_at_ms| now_ms >= expires_at_ms)
2841}
2842
2843fn renew_stream_ttl(stream: &mut StreamMetadata, now_ms: u64) {
2844 if stream.stream_ttl_seconds.is_some() && stream.stream_expires_at_ms.is_none() {
2845 stream.last_ttl_touch_at_ms = now_ms;
2846 }
2847}
2848
2849fn check_stream_seq(stream: &StreamMetadata, incoming: Option<&str>) -> Result<(), StreamResponse> {
2850 let Some(incoming) = incoming else {
2851 return Ok(());
2852 };
2853 if let Some(last) = stream.last_stream_seq.as_deref()
2854 && incoming <= last
2855 {
2856 return Err(StreamResponse::error_with_next_offset(
2857 StreamErrorCode::StreamSeqConflict,
2858 format!("stream sequence '{incoming}' is not greater than last sequence '{last}'"),
2859 stream.tail_offset,
2860 ));
2861 }
2862 Ok(())
2863}
2864
2865fn validate_producer_request(producer: Option<&ProducerRequest>) -> Result<(), StreamResponse> {
2866 let Some(producer) = producer else {
2867 return Ok(());
2868 };
2869 if producer.producer_id.trim().is_empty() {
2870 return Err(StreamResponse::error(
2871 StreamErrorCode::InvalidProducer,
2872 "producer id must not be empty",
2873 ));
2874 }
2875 const MAX_JS_SAFE_INTEGER: u64 = 9_007_199_254_740_991;
2876 if producer.producer_epoch > MAX_JS_SAFE_INTEGER {
2877 return Err(StreamResponse::error(
2878 StreamErrorCode::InvalidProducer,
2879 format!(
2880 "producer epoch {} exceeds maximum {}",
2881 producer.producer_epoch, MAX_JS_SAFE_INTEGER
2882 ),
2883 ));
2884 }
2885 if producer.producer_seq > MAX_JS_SAFE_INTEGER {
2886 return Err(StreamResponse::error(
2887 StreamErrorCode::InvalidProducer,
2888 format!(
2889 "producer sequence {} exceeds maximum {}",
2890 producer.producer_seq, MAX_JS_SAFE_INTEGER
2891 ),
2892 ));
2893 }
2894 Ok(())
2895}
2896
2897fn validate_external_payload_ref(payload: &ExternalPayloadRef) -> Result<(), StreamResponse> {
2898 if payload.s3_path.trim().is_empty() {
2899 return Err(StreamResponse::error(
2900 StreamErrorCode::InvalidColdFlush,
2901 "external payload S3 path must not be empty",
2902 ));
2903 }
2904 if payload.payload_len == 0 {
2905 return Err(StreamResponse::error(
2906 StreamErrorCode::EmptyAppend,
2907 "external payload length must be greater than zero",
2908 ));
2909 }
2910 if payload.object_size < payload.payload_len {
2911 return Err(StreamResponse::error(
2912 StreamErrorCode::InvalidColdFlush,
2913 "external payload object size must cover payload length",
2914 ));
2915 }
2916 Ok(())
2917}
2918
2919fn restore_producer_states(
2920 stream_id: &BucketStreamId,
2921 snapshots: Vec<ProducerSnapshot>,
2922) -> Result<HashMap<String, ProducerState>, StreamSnapshotError> {
2923 let mut states = HashMap::with_capacity(snapshots.len());
2924 for snapshot in snapshots {
2925 if states
2926 .insert(
2927 snapshot.producer_id.clone(),
2928 ProducerState {
2929 producer_epoch: snapshot.producer_epoch,
2930 producer_seq: snapshot.producer_seq,
2931 last_start_offset: snapshot.last_start_offset,
2932 last_next_offset: snapshot.last_next_offset,
2933 last_closed: snapshot.last_closed,
2934 last_items: snapshot.last_items,
2935 },
2936 )
2937 .is_some()
2938 {
2939 return Err(StreamSnapshotError::DuplicateProducer {
2940 stream_id: stream_id.clone(),
2941 producer_id: snapshot.producer_id,
2942 });
2943 }
2944 }
2945 Ok(states)
2946}
2947
2948fn valid_cold_chunk_ref(chunk: &ColdChunkRef) -> bool {
2949 chunk.end_offset > chunk.start_offset
2950 && !chunk.s3_path.trim().is_empty()
2951 && chunk.object_size >= chunk.end_offset - chunk.start_offset
2952}
2953
2954fn valid_object_payload_ref(object: &ObjectPayloadRef) -> bool {
2955 object.end_offset > object.start_offset
2956 && !object.s3_path.trim().is_empty()
2957 && object.object_size >= object.end_offset - object.start_offset
2958}
2959
2960fn hot_segments_match_payload(segments: &[HotPayloadSegment], payload_len: usize) -> bool {
2961 let mut expected_payload_start = 0;
2962 for segment in segments {
2963 if segment.end_offset <= segment.start_offset
2964 || segment.payload_start != expected_payload_start
2965 || segment.payload_end <= segment.payload_start
2966 || segment.payload_end > payload_len
2967 {
2968 return false;
2969 }
2970 let Ok(logical_len) = usize::try_from(segment.end_offset - segment.start_offset) else {
2971 return false;
2972 };
2973 if logical_len != segment.payload_end - segment.payload_start {
2974 return false;
2975 }
2976 expected_payload_start = segment.payload_end;
2977 }
2978 expected_payload_start == payload_len
2979}
2980
2981fn payload_sources_cover_retained_suffix(
2982 cold_chunks: &[ColdChunkRef],
2983 external_segments: &[ObjectPayloadRef],
2984 hot_segments: &[HotPayloadSegment],
2985 retained_offset: u64,
2986 tail_offset: u64,
2987) -> bool {
2988 if tail_offset < retained_offset {
2989 return false;
2990 }
2991 let mut ranges =
2992 Vec::with_capacity(cold_chunks.len() + external_segments.len() + hot_segments.len());
2993 for chunk in cold_chunks {
2994 if !valid_cold_chunk_ref(chunk) {
2995 return false;
2996 }
2997 ranges.push((chunk.start_offset, chunk.end_offset));
2998 }
2999 for object in external_segments {
3000 if !valid_object_payload_ref(object) {
3001 return false;
3002 }
3003 ranges.push((object.start_offset, object.end_offset));
3004 }
3005 for segment in hot_segments {
3006 if segment.end_offset <= segment.start_offset {
3007 return false;
3008 }
3009 ranges.push((segment.start_offset, segment.end_offset));
3010 }
3011 ranges.sort_unstable();
3012
3013 let mut expected_start = retained_offset;
3014 for (start_offset, end_offset) in ranges {
3015 if end_offset <= expected_start {
3016 continue;
3017 }
3018 if start_offset > expected_start {
3019 return false;
3020 }
3021 expected_start = end_offset;
3022 if expected_start >= tail_offset {
3023 return true;
3024 }
3025 }
3026 expected_start == tail_offset
3027}
3028
3029fn segments_cover_range(
3030 segments: &[(u64, StreamReadSegment)],
3031 offset: u64,
3032 next_offset: u64,
3033) -> bool {
3034 if next_offset < offset {
3035 return false;
3036 }
3037 let mut expected_start = offset;
3038 for (segment_start, segment) in segments {
3039 let Some(segment_end) = read_segment_end(*segment_start, segment) else {
3040 return false;
3041 };
3042 if segment_end <= expected_start {
3043 continue;
3044 }
3045 if *segment_start > expected_start {
3046 return false;
3047 }
3048 expected_start = segment_end;
3049 if expected_start >= next_offset {
3050 return true;
3051 }
3052 }
3053 expected_start == next_offset
3054}
3055
3056fn read_segment_end(segment_start: u64, segment: &StreamReadSegment) -> Option<u64> {
3057 match segment {
3058 StreamReadSegment::Object(object) => {
3059 if object.len == 0
3060 || object.read_start_offset != segment_start
3061 || object.read_start_offset < object.object.start_offset
3062 {
3063 return None;
3064 }
3065 let len = u64::try_from(object.len).ok()?;
3066 let segment_end = object.read_start_offset.checked_add(len)?;
3067 if segment_end > object.object.end_offset {
3068 return None;
3069 }
3070 Some(segment_end)
3071 }
3072 StreamReadSegment::Hot(payload) => {
3073 if payload.is_empty() {
3074 return None;
3075 }
3076 let len = u64::try_from(payload.len()).ok()?;
3077 segment_start.checked_add(len)
3078 }
3079 }
3080}
3081
3082fn message_records_cover_retained_suffix(
3083 records: &[StreamMessageRecord],
3084 retained_offset: u64,
3085 tail_offset: u64,
3086) -> bool {
3087 let mut expected_start = retained_offset;
3088 for record in records {
3089 if record.start_offset != expected_start || record.end_offset <= record.start_offset {
3090 return false;
3091 }
3092 expected_start = record.end_offset;
3093 }
3094 expected_start == tail_offset
3095}
3096
3097fn compare_stream_ids(left: &BucketStreamId, right: &BucketStreamId) -> std::cmp::Ordering {
3098 left.bucket_id
3099 .cmp(&right.bucket_id)
3100 .then_with(|| left.stream_id.cmp(&right.stream_id))
3101}
3102
3103pub fn validate_bucket_id(bucket_id: &str) -> Result<(), String> {
3104 if !(4..=64).contains(&bucket_id.len()) {
3105 return Err(format!(
3106 "bucket_id must be 4 to 64 bytes, got {} bytes",
3107 bucket_id.len()
3108 ));
3109 }
3110 if !bucket_id.bytes().all(|byte| {
3111 byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'_' || byte == b'-'
3112 }) {
3113 return Err("bucket_id must match ^[a-z0-9_-]{4,64}$".to_owned());
3114 }
3115 Ok(())
3116}
3117
3118pub fn validate_stream_id(stream_id: &BucketStreamId) -> Result<(), String> {
3119 let local = stream_id.stream_id.as_str();
3120 if local.is_empty() {
3121 return Err("stream_id must not be empty".to_owned());
3122 }
3123 if local.len() > 122 {
3124 return Err(format!(
3125 "stream_id must not exceed 122 bytes, got {} bytes",
3126 local.len()
3127 ));
3128 }
3129 if local == "streams" {
3130 return Err("stream_id 'streams' is reserved".to_owned());
3131 }
3132 if local.contains('/') || local.contains('\0') || local.contains("..") {
3133 return Err("stream_id must not contain '/', NUL, or '..'".to_owned());
3134 }
3135 let combined_len = stream_id.bucket_id.len() + 1 + local.len();
3136 if combined_len > 122 {
3137 return Err(format!(
3138 "bucket_id/stream_id must not exceed 122 bytes, got {combined_len} bytes"
3139 ));
3140 }
3141 Ok(())
3142}
3143
3144#[cfg(test)]
3145mod tests {
3146 use super::*;
3147
3148 fn stream(id: &str) -> BucketStreamId {
3149 BucketStreamId::new("benchcmp", id)
3150 }
3151
3152 fn create_bucket(machine: &mut StreamStateMachine) {
3153 assert_eq!(
3154 machine.apply(StreamCommand::CreateBucket {
3155 bucket_id: "benchcmp".to_owned(),
3156 }),
3157 StreamResponse::BucketCreated {
3158 bucket_id: "benchcmp".to_owned(),
3159 }
3160 );
3161 }
3162
3163 fn create_stream(machine: &mut StreamStateMachine, id: &str) {
3164 assert_eq!(
3165 machine.apply(StreamCommand::CreateStream {
3166 stream_id: stream(id),
3167 content_type: "application/octet-stream".to_owned(),
3168 initial_payload: Vec::new(),
3169 close_after: false,
3170 stream_seq: None,
3171 producer: None,
3172 stream_ttl_seconds: None,
3173 stream_expires_at_ms: None,
3174 forked_from: None,
3175 fork_offset: None,
3176 now_ms: 0,
3177 }),
3178 StreamResponse::Created {
3179 stream_id: stream(id),
3180 next_offset: 0,
3181 closed: false,
3182 }
3183 );
3184 }
3185
3186 fn producer(id: &str, epoch: u64, seq: u64) -> ProducerRequest {
3187 ProducerRequest {
3188 producer_id: id.to_owned(),
3189 producer_epoch: epoch,
3190 producer_seq: seq,
3191 }
3192 }
3193
3194 #[test]
3195 fn stream_create_requires_existing_bucket_and_valid_ids() {
3196 let mut machine = StreamStateMachine::new();
3197
3198 assert!(matches!(
3199 machine.apply(StreamCommand::CreateBucket {
3200 bucket_id: "Bad".to_owned(),
3201 }),
3202 StreamResponse::Error {
3203 code: StreamErrorCode::InvalidBucketId,
3204 ..
3205 }
3206 ));
3207 assert!(matches!(
3208 machine.apply(StreamCommand::CreateStream {
3209 stream_id: stream("s-1"),
3210 content_type: "application/octet-stream".to_owned(),
3211 initial_payload: Vec::new(),
3212 close_after: false,
3213 stream_seq: None,
3214 producer: None,
3215 stream_ttl_seconds: None,
3216 stream_expires_at_ms: None,
3217 forked_from: None,
3218 fork_offset: None,
3219 now_ms: 0,
3220 }),
3221 StreamResponse::Error {
3222 code: StreamErrorCode::BucketNotFound,
3223 ..
3224 }
3225 ));
3226
3227 create_bucket(&mut machine);
3228 assert!(matches!(
3229 machine.apply(StreamCommand::CreateStream {
3230 stream_id: stream("streams"),
3231 content_type: "application/octet-stream".to_owned(),
3232 initial_payload: Vec::new(),
3233 close_after: false,
3234 stream_seq: None,
3235 producer: None,
3236 stream_ttl_seconds: None,
3237 stream_expires_at_ms: None,
3238 forked_from: None,
3239 fork_offset: None,
3240 now_ms: 0,
3241 }),
3242 StreamResponse::Error {
3243 code: StreamErrorCode::InvalidStreamId,
3244 ..
3245 }
3246 ));
3247 }
3248
3249 #[test]
3250 fn create_stream_is_idempotent_only_when_metadata_matches() {
3251 let mut machine = StreamStateMachine::new();
3252 create_bucket(&mut machine);
3253 create_stream(&mut machine, "s-1");
3254
3255 assert_eq!(
3256 machine.apply(StreamCommand::CreateStream {
3257 stream_id: stream("s-1"),
3258 content_type: "application/octet-stream".to_owned(),
3259 initial_payload: vec![0; 99],
3260 close_after: false,
3261 stream_seq: None,
3262 producer: None,
3263 stream_ttl_seconds: None,
3264 stream_expires_at_ms: None,
3265 forked_from: None,
3266 fork_offset: None,
3267 now_ms: 0,
3268 }),
3269 StreamResponse::AlreadyExists {
3270 next_offset: 0,
3271 closed: false,
3272 content_type: "application/octet-stream".to_owned(),
3273 stream_ttl_seconds: None,
3274 stream_expires_at_ms: None,
3275 }
3276 );
3277
3278 assert!(matches!(
3279 machine.apply(StreamCommand::CreateStream {
3280 stream_id: stream("s-1"),
3281 content_type: "text/plain".to_owned(),
3282 initial_payload: Vec::new(),
3283 close_after: false,
3284 stream_seq: None,
3285 producer: None,
3286 stream_ttl_seconds: None,
3287 stream_expires_at_ms: None,
3288 forked_from: None,
3289 fork_offset: None,
3290 now_ms: 0,
3291 }),
3292 StreamResponse::Error {
3293 code: StreamErrorCode::StreamAlreadyExistsConflict,
3294 ..
3295 }
3296 ));
3297 }
3298
3299 #[test]
3300 fn append_advances_offsets_and_checks_content_type() {
3301 let mut machine = StreamStateMachine::new();
3302 create_bucket(&mut machine);
3303 create_stream(&mut machine, "s-1");
3304
3305 assert_eq!(
3306 machine.apply(StreamCommand::Append {
3307 stream_id: stream("s-1"),
3308 content_type: Some("application/octet-stream".to_owned()),
3309 payload: b"abcdefg".to_vec(),
3310 close_after: false,
3311 stream_seq: None,
3312 producer: None,
3313 now_ms: 0,
3314 }),
3315 StreamResponse::Appended {
3316 offset: 0,
3317 next_offset: 7,
3318 closed: false,
3319 deduplicated: false,
3320 producer: None,
3321 }
3322 );
3323 assert!(matches!(
3324 machine.apply(StreamCommand::Append {
3325 stream_id: stream("s-1"),
3326 content_type: Some("text/plain".to_owned()),
3327 payload: b"x".to_vec(),
3328 close_after: false,
3329 stream_seq: None,
3330 producer: None,
3331 now_ms: 0,
3332 }),
3333 StreamResponse::Error {
3334 code: StreamErrorCode::ContentTypeMismatch,
3335 next_offset: Some(7),
3336 ..
3337 }
3338 ));
3339 assert_eq!(machine.head(&stream("s-1")).expect("stream").tail_offset, 7);
3340 }
3341
3342 #[test]
3343 fn catch_up_read_returns_payload_slice_and_bounds_errors() {
3344 let mut machine = StreamStateMachine::new();
3345 create_bucket(&mut machine);
3346 create_stream(&mut machine, "s-1");
3347 assert!(matches!(
3348 machine.apply(StreamCommand::Append {
3349 stream_id: stream("s-1"),
3350 content_type: Some("application/octet-stream".to_owned()),
3351 payload: b"abcdefg".to_vec(),
3352 close_after: false,
3353 stream_seq: None,
3354 producer: None,
3355 now_ms: 0,
3356 }),
3357 StreamResponse::Appended { .. }
3358 ));
3359
3360 assert_eq!(
3361 machine.read(&stream("s-1"), 2, 3).expect("read"),
3362 StreamRead {
3363 offset: 2,
3364 next_offset: 5,
3365 content_type: "application/octet-stream".to_owned(),
3366 payload: b"cde".to_vec(),
3367 up_to_date: false,
3368 closed: false,
3369 }
3370 );
3371 assert_eq!(
3372 machine.read(&stream("s-1"), 7, 16).expect("tail read"),
3373 StreamRead {
3374 offset: 7,
3375 next_offset: 7,
3376 content_type: "application/octet-stream".to_owned(),
3377 payload: Vec::new(),
3378 up_to_date: true,
3379 closed: false,
3380 }
3381 );
3382 assert!(matches!(
3383 machine.read(&stream("s-1"), 8, 1),
3384 Err(StreamResponse::Error {
3385 code: StreamErrorCode::OffsetOutOfRange,
3386 next_offset: Some(7),
3387 ..
3388 })
3389 ));
3390 }
3391
3392 #[test]
3393 fn flush_cold_moves_hot_prefix_to_manifest_and_read_plan_splits() {
3394 let mut machine = StreamStateMachine::new();
3395 create_bucket(&mut machine);
3396 create_stream(&mut machine, "cold");
3397 assert!(matches!(
3398 machine.apply(StreamCommand::Append {
3399 stream_id: stream("cold"),
3400 content_type: Some("application/octet-stream".to_owned()),
3401 payload: b"abcdef".to_vec(),
3402 close_after: false,
3403 stream_seq: None,
3404 producer: None,
3405 now_ms: 0,
3406 }),
3407 StreamResponse::Appended {
3408 offset: 0,
3409 next_offset: 6,
3410 ..
3411 }
3412 ));
3413
3414 let candidate = machine
3415 .plan_cold_flush(&stream("cold"), 4, 4)
3416 .expect("plan cold flush")
3417 .expect("cold flush candidate");
3418 assert_eq!(candidate.start_offset, 0);
3419 assert_eq!(candidate.end_offset, 4);
3420 assert_eq!(candidate.payload, b"abcd");
3421 assert_eq!(
3422 machine.apply(StreamCommand::FlushCold {
3423 stream_id: stream("cold"),
3424 chunk: ColdChunkRef {
3425 start_offset: candidate.start_offset,
3426 end_offset: candidate.end_offset,
3427 s3_path: "s3://bucket/cold/000000".to_owned(),
3428 object_size: u64::try_from(candidate.payload.len()).unwrap(),
3429 },
3430 }),
3431 StreamResponse::ColdFlushed {
3432 hot_start_offset: 4,
3433 }
3434 );
3435 assert_eq!(machine.hot_start_offset(&stream("cold")), 4);
3436 assert_eq!(machine.cold_chunks(&stream("cold")).len(), 1);
3437
3438 let plan = machine.read_plan(&stream("cold"), 2, 4).expect("read plan");
3439 assert_eq!(plan.next_offset, 6);
3440 assert_eq!(plan.segments.len(), 2);
3441 match &plan.segments[0] {
3442 StreamReadSegment::Object(segment) => {
3443 assert_eq!(segment.read_start_offset, 2);
3444 assert_eq!(segment.len, 2);
3445 }
3446 other => panic!("expected cold object segment, got {other:?}"),
3447 }
3448 match &plan.segments[1] {
3449 StreamReadSegment::Hot(payload) => assert_eq!(payload, b"ef"),
3450 other => panic!("expected hot segment, got {other:?}"),
3451 }
3452 assert_eq!(
3453 machine.read(&stream("cold"), 0, 6),
3454 Err(StreamResponse::Error {
3455 code: StreamErrorCode::InvalidColdFlush,
3456 message: "stream 'benchcmp/cold' read requires object payload store".to_owned(),
3457 next_offset: Some(6),
3458 })
3459 );
3460 assert_eq!(
3461 machine.read(&stream("cold"), 4, 8).expect("hot read"),
3462 StreamRead {
3463 offset: 4,
3464 next_offset: 6,
3465 content_type: "application/octet-stream".to_owned(),
3466 payload: b"ef".to_vec(),
3467 up_to_date: true,
3468 closed: false,
3469 }
3470 );
3471
3472 let restored = StreamStateMachine::restore(machine.snapshot()).expect("restore snapshot");
3473 assert_eq!(restored.hot_start_offset(&stream("cold")), 4);
3474 assert_eq!(restored.cold_chunks(&stream("cold")).len(), 1);
3475 assert_eq!(
3476 restored.read(&stream("cold"), 4, 8).expect("hot read"),
3477 StreamRead {
3478 offset: 4,
3479 next_offset: 6,
3480 content_type: "application/octet-stream".to_owned(),
3481 payload: b"ef".to_vec(),
3482 up_to_date: true,
3483 closed: false,
3484 }
3485 );
3486 }
3487
3488 #[test]
3489 fn flush_cold_can_coalesce_contiguous_hot_segments() {
3490 let mut machine = StreamStateMachine::new();
3491 create_bucket(&mut machine);
3492 create_stream(&mut machine, "cold-coalesced");
3493 assert!(matches!(
3494 machine.apply(StreamCommand::Append {
3495 stream_id: stream("cold-coalesced"),
3496 content_type: Some("application/octet-stream".to_owned()),
3497 payload: b"abc".to_vec(),
3498 close_after: false,
3499 stream_seq: None,
3500 producer: None,
3501 now_ms: 0,
3502 }),
3503 StreamResponse::Appended {
3504 offset: 0,
3505 next_offset: 3,
3506 ..
3507 }
3508 ));
3509 assert!(matches!(
3510 machine.apply(StreamCommand::Append {
3511 stream_id: stream("cold-coalesced"),
3512 content_type: Some("application/octet-stream".to_owned()),
3513 payload: b"de".to_vec(),
3514 close_after: false,
3515 stream_seq: None,
3516 producer: None,
3517 now_ms: 0,
3518 }),
3519 StreamResponse::Appended {
3520 offset: 3,
3521 next_offset: 5,
3522 ..
3523 }
3524 ));
3525
3526 assert_eq!(
3527 machine.apply(StreamCommand::FlushCold {
3528 stream_id: stream("cold-coalesced"),
3529 chunk: ColdChunkRef {
3530 start_offset: 0,
3531 end_offset: 5,
3532 s3_path: "s3://bucket/cold-coalesced/000000".to_owned(),
3533 object_size: 5,
3534 },
3535 }),
3536 StreamResponse::ColdFlushed {
3537 hot_start_offset: 0,
3538 }
3539 );
3540 assert!(machine.hot_segments(&stream("cold-coalesced")).is_empty());
3541 assert_eq!(machine.hot_payload_len(&stream("cold-coalesced")), Ok(0));
3542 assert_eq!(machine.cold_chunks(&stream("cold-coalesced")).len(), 1);
3543
3544 let plan = machine
3545 .read_plan(&stream("cold-coalesced"), 0, 5)
3546 .expect("read plan");
3547 assert_eq!(plan.next_offset, 5);
3548 assert_eq!(plan.segments.len(), 1);
3549 match &plan.segments[0] {
3550 StreamReadSegment::Object(segment) => {
3551 assert_eq!(segment.read_start_offset, 0);
3552 assert_eq!(segment.len, 5);
3553 }
3554 other => panic!("expected cold object segment, got {other:?}"),
3555 }
3556 }
3557
3558 #[test]
3559 fn plan_cold_flush_coalesces_contiguous_hot_segments() {
3560 let mut machine = StreamStateMachine::new();
3561 create_bucket(&mut machine);
3562 create_stream(&mut machine, "cold-planned-coalesced");
3563 for payload in [b"ab".as_slice(), b"cd".as_slice(), b"ef".as_slice()] {
3564 assert!(matches!(
3565 machine.apply(StreamCommand::Append {
3566 stream_id: stream("cold-planned-coalesced"),
3567 content_type: Some("application/octet-stream".to_owned()),
3568 payload: payload.to_vec(),
3569 close_after: false,
3570 stream_seq: None,
3571 producer: None,
3572 now_ms: 0,
3573 }),
3574 StreamResponse::Appended { .. }
3575 ));
3576 }
3577
3578 assert!(
3579 machine
3580 .plan_cold_flush(&stream("cold-planned-coalesced"), 4, 4)
3581 .expect("plan cold flush")
3582 .is_some(),
3583 "planner should consider contiguous small hot segments together"
3584 );
3585 let candidate = machine
3586 .plan_cold_flush(&stream("cold-planned-coalesced"), 5, 5)
3587 .expect("plan cold flush")
3588 .expect("candidate");
3589 assert_eq!(candidate.start_offset, 0);
3590 assert_eq!(candidate.end_offset, 5);
3591 assert_eq!(candidate.payload, b"abcde");
3592 }
3593
3594 #[test]
3595 fn plan_next_cold_flush_selects_deterministic_eligible_stream() {
3596 let mut machine = StreamStateMachine::new();
3597 create_bucket(&mut machine);
3598 create_stream(&mut machine, "z-cold");
3599 create_stream(&mut machine, "a-cold");
3600 assert!(matches!(
3601 machine.apply(StreamCommand::Append {
3602 stream_id: stream("z-cold"),
3603 content_type: Some("application/octet-stream".to_owned()),
3604 payload: b"zzzz".to_vec(),
3605 close_after: false,
3606 stream_seq: None,
3607 producer: None,
3608 now_ms: 0,
3609 }),
3610 StreamResponse::Appended { .. }
3611 ));
3612 assert!(matches!(
3613 machine.apply(StreamCommand::Append {
3614 stream_id: stream("a-cold"),
3615 content_type: Some("application/octet-stream".to_owned()),
3616 payload: b"aaaa".to_vec(),
3617 close_after: false,
3618 stream_seq: None,
3619 producer: None,
3620 now_ms: 0,
3621 }),
3622 StreamResponse::Appended { .. }
3623 ));
3624
3625 let candidate = machine
3626 .plan_next_cold_flush(4, 4)
3627 .expect("plan next cold flush")
3628 .expect("candidate");
3629 assert_eq!(candidate.stream_id, stream("a-cold"));
3630 assert_eq!(candidate.payload, b"aaaa");
3631 }
3632
3633 #[test]
3634 fn plan_next_cold_flush_batch_advances_on_preview_state() {
3635 let mut machine = StreamStateMachine::new();
3636 create_bucket(&mut machine);
3637 create_stream(&mut machine, "batched-cold");
3638 assert!(matches!(
3639 machine.apply(StreamCommand::Append {
3640 stream_id: stream("batched-cold"),
3641 content_type: Some("application/octet-stream".to_owned()),
3642 payload: b"abcd".to_vec(),
3643 close_after: false,
3644 stream_seq: None,
3645 producer: None,
3646 now_ms: 0,
3647 }),
3648 StreamResponse::Appended { .. }
3649 ));
3650
3651 let candidates = machine
3652 .plan_next_cold_flush_batch(1, 1, 4)
3653 .expect("plan cold flush batch");
3654 assert_eq!(candidates.len(), 4);
3655 assert_eq!(
3656 candidates
3657 .iter()
3658 .map(|candidate| (candidate.start_offset, candidate.end_offset))
3659 .collect::<Vec<_>>(),
3660 vec![(0, 1), (1, 2), (2, 3), (3, 4)]
3661 );
3662 assert_eq!(
3663 candidates
3664 .iter()
3665 .map(|candidate| candidate.payload.as_slice())
3666 .collect::<Vec<_>>(),
3667 vec![
3668 b"a".as_slice(),
3669 b"b".as_slice(),
3670 b"c".as_slice(),
3671 b"d".as_slice()
3672 ]
3673 );
3674 assert_eq!(machine.hot_payload_len(&stream("batched-cold")), Ok(4));
3675 assert!(machine.cold_chunks(&stream("batched-cold")).is_empty());
3676 }
3677
3678 #[test]
3679 fn stale_cold_flush_candidate_after_delete_recreate_is_invalid_without_mutation() {
3680 let mut machine = StreamStateMachine::new();
3681 create_bucket(&mut machine);
3682 create_stream(&mut machine, "stale-cold");
3683 assert!(matches!(
3684 machine.apply(StreamCommand::Append {
3685 stream_id: stream("stale-cold"),
3686 content_type: Some("application/octet-stream".to_owned()),
3687 payload: b"abcdefghijklmnopqr".to_vec(),
3688 close_after: false,
3689 stream_seq: None,
3690 producer: None,
3691 now_ms: 0,
3692 }),
3693 StreamResponse::Appended {
3694 next_offset: 18,
3695 ..
3696 }
3697 ));
3698 let candidate = machine
3699 .plan_cold_flush(&stream("stale-cold"), 18, 18)
3700 .expect("plan cold flush")
3701 .expect("candidate");
3702
3703 assert!(matches!(
3704 machine.apply(StreamCommand::DeleteStream {
3705 stream_id: stream("stale-cold")
3706 }),
3707 StreamResponse::Deleted {
3708 hard_deleted: true,
3709 ..
3710 }
3711 ));
3712 create_stream(&mut machine, "stale-cold");
3713 assert!(matches!(
3714 machine.apply(StreamCommand::Append {
3715 stream_id: stream("stale-cold"),
3716 content_type: Some("application/octet-stream".to_owned()),
3717 payload: b"abcdefghijklmnopq".to_vec(),
3718 close_after: false,
3719 stream_seq: None,
3720 producer: None,
3721 now_ms: 0,
3722 }),
3723 StreamResponse::Appended {
3724 next_offset: 17,
3725 ..
3726 }
3727 ));
3728
3729 match machine.apply(StreamCommand::FlushCold {
3730 stream_id: stream("stale-cold"),
3731 chunk: ColdChunkRef {
3732 start_offset: candidate.start_offset,
3733 end_offset: candidate.end_offset,
3734 s3_path: "s3://bucket/stale-cold/old-candidate".to_owned(),
3735 object_size: u64::try_from(candidate.payload.len()).unwrap(),
3736 },
3737 }) {
3738 StreamResponse::Error {
3739 code: StreamErrorCode::InvalidColdFlush,
3740 message,
3741 next_offset: Some(17),
3742 } => assert!(message.contains("beyond stream")),
3743 other => panic!("expected stale invalid cold flush, got {other:?}"),
3744 }
3745
3746 assert_eq!(
3747 machine.read(&stream("stale-cold"), 0, 32).expect("read"),
3748 StreamRead {
3749 offset: 0,
3750 next_offset: 17,
3751 content_type: "application/octet-stream".to_owned(),
3752 payload: b"abcdefghijklmnopq".to_vec(),
3753 up_to_date: true,
3754 closed: false,
3755 }
3756 );
3757 }
3758
3759 #[test]
3760 fn plan_next_cold_flush_skips_soft_deleted_streams() {
3761 let mut machine = StreamStateMachine::new();
3762 create_bucket(&mut machine);
3763 create_stream(&mut machine, "a-gone");
3764 create_stream(&mut machine, "b-live");
3765 assert!(matches!(
3766 machine.apply(StreamCommand::Append {
3767 stream_id: stream("a-gone"),
3768 content_type: Some("application/octet-stream".to_owned()),
3769 payload: b"gone".to_vec(),
3770 close_after: false,
3771 stream_seq: None,
3772 producer: None,
3773 now_ms: 0,
3774 }),
3775 StreamResponse::Appended { .. }
3776 ));
3777 assert!(matches!(
3778 machine.apply(StreamCommand::AddForkRef {
3779 stream_id: stream("a-gone"),
3780 now_ms: 0,
3781 }),
3782 StreamResponse::ForkRefAdded { .. }
3783 ));
3784 assert_eq!(
3785 machine.apply(StreamCommand::DeleteStream {
3786 stream_id: stream("a-gone"),
3787 }),
3788 StreamResponse::Deleted {
3789 hard_deleted: false,
3790 parent_to_release: None,
3791 }
3792 );
3793 assert!(matches!(
3794 machine.apply(StreamCommand::Append {
3795 stream_id: stream("b-live"),
3796 content_type: Some("application/octet-stream".to_owned()),
3797 payload: b"live".to_vec(),
3798 close_after: false,
3799 stream_seq: None,
3800 producer: None,
3801 now_ms: 0,
3802 }),
3803 StreamResponse::Appended { .. }
3804 ));
3805
3806 let candidate = machine
3807 .plan_next_cold_flush(4, 4)
3808 .expect("plan next cold flush")
3809 .expect("candidate");
3810 assert_eq!(candidate.stream_id, stream("b-live"));
3811 assert_eq!(candidate.payload, b"live");
3812 }
3813
3814 #[test]
3815 fn hot_payload_byte_metrics_follow_cold_flush() {
3816 let mut machine = StreamStateMachine::new();
3817 create_bucket(&mut machine);
3818 create_stream(&mut machine, "hot-a");
3819 create_stream(&mut machine, "hot-b");
3820 for (stream_name, payload) in [("hot-a", b"abcd".as_slice()), ("hot-b", b"xy".as_slice())] {
3821 assert!(matches!(
3822 machine.apply(StreamCommand::Append {
3823 stream_id: stream(stream_name),
3824 content_type: Some("application/octet-stream".to_owned()),
3825 payload: payload.to_vec(),
3826 close_after: false,
3827 stream_seq: None,
3828 producer: None,
3829 now_ms: 0,
3830 }),
3831 StreamResponse::Appended { .. }
3832 ));
3833 }
3834
3835 assert_eq!(machine.hot_payload_len(&stream("hot-a")), Ok(4));
3836 assert_eq!(machine.hot_payload_len(&stream("hot-b")), Ok(2));
3837 assert_eq!(machine.total_hot_payload_bytes(), 6);
3838
3839 assert_eq!(
3840 machine.apply(StreamCommand::FlushCold {
3841 stream_id: stream("hot-a"),
3842 chunk: ColdChunkRef {
3843 start_offset: 0,
3844 end_offset: 3,
3845 s3_path: "s3://bucket/hot-a/000000".to_owned(),
3846 object_size: 3,
3847 },
3848 }),
3849 StreamResponse::ColdFlushed {
3850 hot_start_offset: 3,
3851 }
3852 );
3853 assert_eq!(machine.hot_payload_len(&stream("hot-a")), Ok(1));
3854 assert_eq!(machine.total_hot_payload_bytes(), 3);
3855 }
3856
3857 #[test]
3858 fn snapshot_restore_round_trips_payload_metadata_and_stream_seq() {
3859 let mut machine = StreamStateMachine::new();
3860 create_bucket(&mut machine);
3861 assert_eq!(
3862 machine.apply(StreamCommand::CreateStream {
3863 stream_id: stream("snap-open"),
3864 content_type: "application/octet-stream".to_owned(),
3865 initial_payload: b"hi".to_vec(),
3866 close_after: false,
3867 stream_seq: Some("0001".to_owned()),
3868 producer: None,
3869 stream_ttl_seconds: Some(60),
3870 stream_expires_at_ms: None,
3871 forked_from: None,
3872 fork_offset: None,
3873 now_ms: 0,
3874 }),
3875 StreamResponse::Created {
3876 stream_id: stream("snap-open"),
3877 next_offset: 2,
3878 closed: false,
3879 }
3880 );
3881 assert!(matches!(
3882 machine.apply(StreamCommand::Append {
3883 stream_id: stream("snap-open"),
3884 content_type: Some("application/octet-stream".to_owned()),
3885 payload: b"abc".to_vec(),
3886 close_after: false,
3887 stream_seq: Some("0002".to_owned()),
3888 producer: None,
3889 now_ms: 0,
3890 }),
3891 StreamResponse::Appended {
3892 offset: 2,
3893 next_offset: 5,
3894 ..
3895 }
3896 ));
3897 assert_eq!(
3898 machine.apply(StreamCommand::CreateStream {
3899 stream_id: stream("snap-closed"),
3900 content_type: "application/octet-stream".to_owned(),
3901 initial_payload: b"x".to_vec(),
3902 close_after: true,
3903 stream_seq: None,
3904 producer: None,
3905 stream_ttl_seconds: None,
3906 stream_expires_at_ms: None,
3907 forked_from: None,
3908 fork_offset: None,
3909 now_ms: 0,
3910 }),
3911 StreamResponse::Created {
3912 stream_id: stream("snap-closed"),
3913 next_offset: 1,
3914 closed: true,
3915 }
3916 );
3917
3918 let encoded = serde_json::to_vec(&machine.snapshot()).expect("serialize snapshot");
3919 let decoded =
3920 serde_json::from_slice::<StreamSnapshot>(&encoded).expect("deserialize snapshot");
3921 let mut restored = StreamStateMachine::restore(decoded).expect("restore snapshot");
3922
3923 assert_eq!(
3924 restored.read(&stream("snap-open"), 0, 16).expect("read"),
3925 StreamRead {
3926 offset: 0,
3927 next_offset: 5,
3928 content_type: "application/octet-stream".to_owned(),
3929 payload: b"hiabc".to_vec(),
3930 up_to_date: true,
3931 closed: false,
3932 }
3933 );
3934 let metadata = restored.head(&stream("snap-open")).expect("metadata");
3935 assert_eq!(metadata.last_stream_seq.as_deref(), Some("0002"));
3936 assert_eq!(metadata.stream_ttl_seconds, Some(60));
3937 assert_eq!(metadata.stream_expires_at_ms, None);
3938
3939 assert!(matches!(
3940 restored.apply(StreamCommand::Append {
3941 stream_id: stream("snap-open"),
3942 content_type: Some("application/octet-stream".to_owned()),
3943 payload: b"bad".to_vec(),
3944 close_after: false,
3945 stream_seq: Some("0002".to_owned()),
3946 producer: None,
3947 now_ms: 0,
3948 }),
3949 StreamResponse::Error {
3950 code: StreamErrorCode::StreamSeqConflict,
3951 next_offset: Some(5),
3952 ..
3953 }
3954 ));
3955 assert_eq!(
3956 restored.apply(StreamCommand::Append {
3957 stream_id: stream("snap-open"),
3958 content_type: Some("application/octet-stream".to_owned()),
3959 payload: b"!".to_vec(),
3960 close_after: false,
3961 stream_seq: Some("0003".to_owned()),
3962 producer: None,
3963 now_ms: 0,
3964 }),
3965 StreamResponse::Appended {
3966 offset: 5,
3967 next_offset: 6,
3968 closed: false,
3969 deduplicated: false,
3970 producer: None,
3971 }
3972 );
3973 assert!(matches!(
3974 restored.apply(StreamCommand::Append {
3975 stream_id: stream("snap-closed"),
3976 content_type: Some("application/octet-stream".to_owned()),
3977 payload: b"!".to_vec(),
3978 close_after: false,
3979 stream_seq: None,
3980 producer: None,
3981 now_ms: 0,
3982 }),
3983 StreamResponse::Error {
3984 code: StreamErrorCode::StreamClosed,
3985 next_offset: Some(1),
3986 ..
3987 }
3988 ));
3989 }
3990
3991 #[test]
3992 fn snapshot_order_is_deterministic() {
3993 let mut machine = StreamStateMachine::new();
3994 for bucket_id in ["zzzz", "benchcmp", "aaaa"] {
3995 machine.apply(StreamCommand::CreateBucket {
3996 bucket_id: bucket_id.to_owned(),
3997 });
3998 }
3999 for stream_id in [
4000 BucketStreamId::new("zzzz", "stream-b"),
4001 BucketStreamId::new("benchcmp", "stream-b"),
4002 BucketStreamId::new("benchcmp", "stream-a"),
4003 BucketStreamId::new("aaaa", "stream-z"),
4004 ] {
4005 assert!(matches!(
4006 machine.apply(StreamCommand::CreateStream {
4007 stream_id,
4008 content_type: "application/octet-stream".to_owned(),
4009 initial_payload: Vec::new(),
4010 close_after: false,
4011 stream_seq: None,
4012 producer: None,
4013 stream_ttl_seconds: None,
4014 stream_expires_at_ms: None,
4015 forked_from: None,
4016 fork_offset: None,
4017 now_ms: 0,
4018 }),
4019 StreamResponse::Created { .. }
4020 ));
4021 }
4022
4023 let snapshot = machine.snapshot();
4024 assert_eq!(snapshot.buckets, ["aaaa", "benchcmp", "zzzz"]);
4025 assert_eq!(
4026 snapshot
4027 .streams
4028 .iter()
4029 .map(|entry| entry.metadata.stream_id.to_string())
4030 .collect::<Vec<_>>(),
4031 [
4032 "aaaa/stream-z",
4033 "benchcmp/stream-a",
4034 "benchcmp/stream-b",
4035 "zzzz/stream-b",
4036 ]
4037 );
4038 }
4039
4040 #[test]
4041 fn snapshot_restore_rejects_invalid_entries() {
4042 assert_eq!(
4043 StreamStateMachine::restore(StreamSnapshot {
4044 buckets: vec!["benchcmp".to_owned(), "benchcmp".to_owned()],
4045 streams: Vec::new(),
4046 })
4047 .expect_err("duplicate bucket"),
4048 StreamSnapshotError::DuplicateBucket("benchcmp".to_owned())
4049 );
4050
4051 assert!(matches!(
4052 StreamStateMachine::restore(StreamSnapshot {
4053 buckets: vec!["benchcmp".to_owned()],
4054 streams: vec![StreamSnapshotEntry {
4055 metadata: StreamMetadata {
4056 stream_id: BucketStreamId::new("missing", "stream"),
4057 content_type: "application/octet-stream".to_owned(),
4058 status: StreamStatus::Open,
4059 tail_offset: 0,
4060 last_stream_seq: None,
4061 stream_ttl_seconds: None,
4062 stream_expires_at_ms: None,
4063 created_at_ms: 0,
4064 last_ttl_touch_at_ms: 0,
4065 forked_from: None,
4066 fork_offset: None,
4067 fork_ref_count: 0,
4068 },
4069 hot_start_offset: 0,
4070 payload: Vec::new(),
4071 hot_segments: Vec::new(),
4072 cold_chunks: Vec::new(),
4073 external_segments: Vec::new(),
4074 message_records: Vec::new(),
4075 visible_snapshot: None,
4076 producer_states: Vec::new(),
4077 }],
4078 }),
4079 Err(StreamSnapshotError::MissingBucket(_))
4080 ));
4081
4082 assert!(matches!(
4083 StreamStateMachine::restore(StreamSnapshot {
4084 buckets: vec!["benchcmp".to_owned()],
4085 streams: vec![StreamSnapshotEntry {
4086 metadata: StreamMetadata {
4087 stream_id: stream("bad-len"),
4088 content_type: "application/octet-stream".to_owned(),
4089 status: StreamStatus::Open,
4090 tail_offset: 2,
4091 last_stream_seq: None,
4092 stream_ttl_seconds: None,
4093 stream_expires_at_ms: None,
4094 created_at_ms: 0,
4095 last_ttl_touch_at_ms: 0,
4096 forked_from: None,
4097 fork_offset: None,
4098 fork_ref_count: 0,
4099 },
4100 hot_start_offset: 0,
4101 payload: b"x".to_vec(),
4102 hot_segments: Vec::new(),
4103 cold_chunks: Vec::new(),
4104 external_segments: Vec::new(),
4105 message_records: Vec::new(),
4106 visible_snapshot: None,
4107 producer_states: Vec::new(),
4108 }],
4109 }),
4110 Err(StreamSnapshotError::PayloadLengthMismatch { .. })
4111 ));
4112
4113 assert!(matches!(
4114 StreamStateMachine::restore(StreamSnapshot {
4115 buckets: vec!["benchcmp".to_owned()],
4116 streams: vec![StreamSnapshotEntry {
4117 metadata: StreamMetadata {
4118 stream_id: stream("duplicate-producer"),
4119 content_type: "application/octet-stream".to_owned(),
4120 status: StreamStatus::Open,
4121 tail_offset: 0,
4122 last_stream_seq: None,
4123 stream_ttl_seconds: None,
4124 stream_expires_at_ms: None,
4125 created_at_ms: 0,
4126 last_ttl_touch_at_ms: 0,
4127 forked_from: None,
4128 fork_offset: None,
4129 fork_ref_count: 0,
4130 },
4131 hot_start_offset: 0,
4132 payload: Vec::new(),
4133 hot_segments: Vec::new(),
4134 cold_chunks: Vec::new(),
4135 external_segments: Vec::new(),
4136 message_records: Vec::new(),
4137 visible_snapshot: None,
4138 producer_states: vec![
4139 ProducerSnapshot {
4140 producer_id: "writer-1".to_owned(),
4141 producer_epoch: 0,
4142 producer_seq: 0,
4143 last_start_offset: 0,
4144 last_next_offset: 0,
4145 last_closed: false,
4146 last_items: Vec::new(),
4147 },
4148 ProducerSnapshot {
4149 producer_id: "writer-1".to_owned(),
4150 producer_epoch: 1,
4151 producer_seq: 0,
4152 last_start_offset: 0,
4153 last_next_offset: 0,
4154 last_closed: false,
4155 last_items: Vec::new(),
4156 },
4157 ],
4158 }],
4159 }),
4160 Err(StreamSnapshotError::DuplicateProducer { .. })
4161 ));
4162 }
4163
4164 #[test]
4165 fn close_is_monotonic_and_close_only_is_idempotent() {
4166 let mut machine = StreamStateMachine::new();
4167 create_bucket(&mut machine);
4168 create_stream(&mut machine, "s-1");
4169
4170 assert_eq!(
4171 machine.apply(StreamCommand::Append {
4172 stream_id: stream("s-1"),
4173 content_type: Some("application/octet-stream".to_owned()),
4174 payload: b"abc".to_vec(),
4175 close_after: true,
4176 stream_seq: None,
4177 producer: None,
4178 now_ms: 0,
4179 }),
4180 StreamResponse::Appended {
4181 offset: 0,
4182 next_offset: 3,
4183 closed: true,
4184 deduplicated: false,
4185 producer: None,
4186 }
4187 );
4188 assert_eq!(
4189 machine.apply(StreamCommand::Close {
4190 stream_id: stream("s-1"),
4191 stream_seq: None,
4192 producer: None,
4193 now_ms: 0,
4194 }),
4195 StreamResponse::Closed {
4196 next_offset: 3,
4197 deduplicated: false,
4198 producer: None,
4199 }
4200 );
4201 assert!(matches!(
4202 machine.apply(StreamCommand::Append {
4203 stream_id: stream("s-1"),
4204 content_type: Some("application/octet-stream".to_owned()),
4205 payload: b"x".to_vec(),
4206 close_after: false,
4207 stream_seq: None,
4208 producer: None,
4209 now_ms: 0,
4210 }),
4211 StreamResponse::Error {
4212 code: StreamErrorCode::StreamClosed,
4213 next_offset: Some(3),
4214 ..
4215 }
4216 ));
4217 }
4218
4219 #[test]
4220 fn stream_seq_must_strictly_increase() {
4221 let mut machine = StreamStateMachine::new();
4222 create_bucket(&mut machine);
4223 create_stream(&mut machine, "s-1");
4224
4225 assert!(matches!(
4226 machine.apply(StreamCommand::Append {
4227 stream_id: stream("s-1"),
4228 content_type: Some("application/octet-stream".to_owned()),
4229 payload: b"a".to_vec(),
4230 close_after: false,
4231 stream_seq: Some("0002".to_owned()),
4232 producer: None,
4233 now_ms: 0,
4234 }),
4235 StreamResponse::Appended { .. }
4236 ));
4237 assert!(matches!(
4238 machine.apply(StreamCommand::Append {
4239 stream_id: stream("s-1"),
4240 content_type: Some("application/octet-stream".to_owned()),
4241 payload: b"b".to_vec(),
4242 close_after: false,
4243 stream_seq: Some("0002".to_owned()),
4244 producer: None,
4245 now_ms: 0,
4246 }),
4247 StreamResponse::Error {
4248 code: StreamErrorCode::StreamSeqConflict,
4249 next_offset: Some(1),
4250 ..
4251 }
4252 ));
4253 assert!(matches!(
4254 machine.apply(StreamCommand::Append {
4255 stream_id: stream("s-1"),
4256 content_type: Some("application/octet-stream".to_owned()),
4257 payload: b"c".to_vec(),
4258 close_after: false,
4259 stream_seq: Some("0003".to_owned()),
4260 producer: None,
4261 now_ms: 0,
4262 }),
4263 StreamResponse::Appended {
4264 offset: 1,
4265 next_offset: 2,
4266 ..
4267 }
4268 ));
4269 }
4270
4271 #[test]
4272 fn producer_headers_deduplicate_retries_and_fence_stale_epochs() {
4273 let mut machine = StreamStateMachine::new();
4274 create_bucket(&mut machine);
4275 create_stream(&mut machine, "producer-stream");
4276
4277 assert_eq!(
4278 machine.apply(StreamCommand::Append {
4279 stream_id: stream("producer-stream"),
4280 content_type: Some("application/octet-stream".to_owned()),
4281 payload: b"a".to_vec(),
4282 close_after: false,
4283 stream_seq: None,
4284 producer: Some(producer("writer-1", 0, 0)),
4285 now_ms: 0,
4286 }),
4287 StreamResponse::Appended {
4288 offset: 0,
4289 next_offset: 1,
4290 closed: false,
4291 deduplicated: false,
4292 producer: Some(producer("writer-1", 0, 0)),
4293 }
4294 );
4295 assert_eq!(
4296 machine.apply(StreamCommand::Append {
4297 stream_id: stream("producer-stream"),
4298 content_type: Some("application/octet-stream".to_owned()),
4299 payload: b"ignored-retry-body".to_vec(),
4300 close_after: false,
4301 stream_seq: None,
4302 producer: Some(producer("writer-1", 0, 0)),
4303 now_ms: 0,
4304 }),
4305 StreamResponse::Appended {
4306 offset: 0,
4307 next_offset: 1,
4308 closed: false,
4309 deduplicated: true,
4310 producer: Some(producer("writer-1", 0, 0)),
4311 }
4312 );
4313 assert_eq!(
4314 machine
4315 .read(&stream("producer-stream"), 0, 16)
4316 .expect("read")
4317 .payload,
4318 b"a"
4319 );
4320
4321 assert!(matches!(
4322 machine.apply(StreamCommand::Append {
4323 stream_id: stream("producer-stream"),
4324 content_type: Some("application/octet-stream".to_owned()),
4325 payload: b"gap".to_vec(),
4326 close_after: false,
4327 stream_seq: None,
4328 producer: Some(producer("writer-1", 0, 2)),
4329 now_ms: 0,
4330 }),
4331 StreamResponse::Error {
4332 code: StreamErrorCode::ProducerSeqConflict,
4333 ..
4334 }
4335 ));
4336
4337 assert_eq!(
4338 machine.apply(StreamCommand::Append {
4339 stream_id: stream("producer-stream"),
4340 content_type: Some("application/octet-stream".to_owned()),
4341 payload: b"b".to_vec(),
4342 close_after: false,
4343 stream_seq: None,
4344 producer: Some(producer("writer-1", 1, 0)),
4345 now_ms: 0,
4346 }),
4347 StreamResponse::Appended {
4348 offset: 1,
4349 next_offset: 2,
4350 closed: false,
4351 deduplicated: false,
4352 producer: Some(producer("writer-1", 1, 0)),
4353 }
4354 );
4355 assert!(matches!(
4356 machine.apply(StreamCommand::Append {
4357 stream_id: stream("producer-stream"),
4358 content_type: Some("application/octet-stream".to_owned()),
4359 payload: b"stale".to_vec(),
4360 close_after: false,
4361 stream_seq: None,
4362 producer: Some(producer("writer-1", 0, 1)),
4363 now_ms: 0,
4364 }),
4365 StreamResponse::Error {
4366 code: StreamErrorCode::ProducerEpochStale,
4367 ..
4368 }
4369 ));
4370 }
4371
4372 #[test]
4373 fn producer_append_batch_deduplicates_retries_without_partial_mutation() {
4374 let mut machine = StreamStateMachine::new();
4375 create_bucket(&mut machine);
4376 create_stream(&mut machine, "producer-batch");
4377
4378 let first_payloads = [b"ab".as_slice(), b"c".as_slice()];
4379 let first = machine
4380 .append_batch_borrowed(
4381 stream("producer-batch"),
4382 Some("application/octet-stream"),
4383 &first_payloads,
4384 Some(producer("writer-1", 0, 0)),
4385 0,
4386 )
4387 .expect("first batch");
4388 assert_eq!(
4389 first.items,
4390 vec![
4391 StreamBatchAppendItem {
4392 offset: 0,
4393 next_offset: 2,
4394 closed: false,
4395 deduplicated: false,
4396 },
4397 StreamBatchAppendItem {
4398 offset: 2,
4399 next_offset: 3,
4400 closed: false,
4401 deduplicated: false,
4402 },
4403 ]
4404 );
4405 assert!(!first.deduplicated);
4406
4407 let duplicate = machine
4408 .append_batch_borrowed(
4409 stream("producer-batch"),
4410 Some("application/octet-stream"),
4411 &first_payloads,
4412 Some(producer("writer-1", 0, 0)),
4413 0,
4414 )
4415 .expect("duplicate batch");
4416 assert!(duplicate.deduplicated);
4417 assert!(duplicate.items.iter().all(|item| item.deduplicated));
4418 assert_eq!(duplicate.items[0].offset, 0);
4419 assert_eq!(duplicate.items[1].next_offset, 3);
4420 assert_eq!(
4421 machine
4422 .read(&stream("producer-batch"), 0, 16)
4423 .expect("read")
4424 .payload,
4425 b"abc"
4426 );
4427
4428 let invalid_payloads = [b"".as_slice()];
4429 assert!(matches!(
4430 machine.append_batch_borrowed(
4431 stream("producer-batch"),
4432 Some("application/octet-stream"),
4433 &invalid_payloads,
4434 Some(producer("writer-1", 0, 1)),
4435 0,
4436 ),
4437 Err(StreamResponse::Error {
4438 code: StreamErrorCode::EmptyAppend,
4439 ..
4440 })
4441 ));
4442
4443 let next_payloads = [b"d".as_slice()];
4444 let next = machine
4445 .append_batch_borrowed(
4446 stream("producer-batch"),
4447 Some("application/octet-stream"),
4448 &next_payloads,
4449 Some(producer("writer-1", 0, 1)),
4450 0,
4451 )
4452 .expect("next batch");
4453 assert_eq!(next.items[0].offset, 3);
4454 assert_eq!(
4455 machine
4456 .read(&stream("producer-batch"), 0, 16)
4457 .expect("read")
4458 .payload,
4459 b"abcd"
4460 );
4461 }
4462
4463 #[test]
4464 fn producer_state_survives_snapshot_restore() {
4465 let mut machine = StreamStateMachine::new();
4466 create_bucket(&mut machine);
4467 create_stream(&mut machine, "producer-snapshot");
4468 assert!(matches!(
4469 machine.apply(StreamCommand::Append {
4470 stream_id: stream("producer-snapshot"),
4471 content_type: Some("application/octet-stream".to_owned()),
4472 payload: b"a".to_vec(),
4473 close_after: false,
4474 stream_seq: None,
4475 producer: Some(producer("writer-1", 0, 0)),
4476 now_ms: 0,
4477 }),
4478 StreamResponse::Appended {
4479 deduplicated: false,
4480 ..
4481 }
4482 ));
4483
4484 let snapshot = machine.snapshot();
4485 assert_eq!(snapshot.streams[0].producer_states.len(), 1);
4486 assert_eq!(
4487 snapshot.streams[0].producer_states[0].last_items,
4488 vec![ProducerAppendRecord {
4489 start_offset: 0,
4490 next_offset: 1,
4491 closed: false,
4492 }]
4493 );
4494 let mut restored = StreamStateMachine::restore(snapshot).expect("restore snapshot");
4495
4496 assert!(matches!(
4497 restored.apply(StreamCommand::Append {
4498 stream_id: stream("producer-snapshot"),
4499 content_type: Some("application/octet-stream".to_owned()),
4500 payload: b"retry".to_vec(),
4501 close_after: false,
4502 stream_seq: None,
4503 producer: Some(producer("writer-1", 0, 0)),
4504 now_ms: 0,
4505 }),
4506 StreamResponse::Appended {
4507 offset: 0,
4508 next_offset: 1,
4509 deduplicated: true,
4510 ..
4511 }
4512 ));
4513 assert_eq!(
4514 restored.apply(StreamCommand::Append {
4515 stream_id: stream("producer-snapshot"),
4516 content_type: Some("application/octet-stream".to_owned()),
4517 payload: b"b".to_vec(),
4518 close_after: false,
4519 stream_seq: None,
4520 producer: Some(producer("writer-1", 0, 1)),
4521 now_ms: 0,
4522 }),
4523 StreamResponse::Appended {
4524 offset: 1,
4525 next_offset: 2,
4526 closed: false,
4527 deduplicated: false,
4528 producer: Some(producer("writer-1", 0, 1)),
4529 }
4530 );
4531 }
4532
4533 #[test]
4534 fn stream_ttl_uses_sliding_access_window() {
4535 let mut machine = StreamStateMachine::new();
4536 create_bucket(&mut machine);
4537 let stream_id = stream("ttl-window");
4538
4539 assert_eq!(
4540 machine.apply(StreamCommand::CreateStream {
4541 stream_id: stream_id.clone(),
4542 content_type: "application/octet-stream".to_owned(),
4543 initial_payload: b"hi".to_vec(),
4544 close_after: false,
4545 stream_seq: None,
4546 producer: None,
4547 stream_ttl_seconds: Some(1),
4548 stream_expires_at_ms: None,
4549 forked_from: None,
4550 fork_offset: None,
4551 now_ms: 1_000,
4552 }),
4553 StreamResponse::Created {
4554 stream_id: stream_id.clone(),
4555 next_offset: 2,
4556 closed: false,
4557 }
4558 );
4559
4560 assert_eq!(
4561 machine.access_requires_write(&stream_id, 1_500, false),
4562 Ok(false)
4563 );
4564 assert_eq!(
4565 machine
4566 .head_at(&stream_id, 1_500)
4567 .expect("head before ttl expiry")
4568 .last_ttl_touch_at_ms,
4569 1_000
4570 );
4571 assert_eq!(
4572 machine.access_requires_write(&stream_id, 1_500, true),
4573 Ok(true)
4574 );
4575 assert_eq!(
4576 machine.apply(StreamCommand::TouchStreamAccess {
4577 stream_id: stream_id.clone(),
4578 now_ms: 1_500,
4579 renew_ttl: true,
4580 }),
4581 StreamResponse::Accessed {
4582 changed: true,
4583 expired: false,
4584 }
4585 );
4586
4587 assert!(machine.read_plan_at(&stream_id, 2, 16, 2_400).is_ok());
4588 assert_eq!(
4589 machine.apply(StreamCommand::Append {
4590 stream_id: stream_id.clone(),
4591 content_type: Some("application/octet-stream".to_owned()),
4592 payload: b"!".to_vec(),
4593 close_after: false,
4594 stream_seq: None,
4595 producer: None,
4596 now_ms: 2_400,
4597 }),
4598 StreamResponse::Appended {
4599 offset: 2,
4600 next_offset: 3,
4601 closed: false,
4602 deduplicated: false,
4603 producer: None,
4604 }
4605 );
4606 assert!(machine.head_at(&stream_id, 3_399).is_some());
4607 assert!(machine.head_at(&stream_id, 3_400).is_none());
4608 assert!(matches!(
4609 machine.apply(StreamCommand::Append {
4610 stream_id: stream_id.clone(),
4611 content_type: Some("application/octet-stream".to_owned()),
4612 payload: b"late".to_vec(),
4613 close_after: false,
4614 stream_seq: None,
4615 producer: None,
4616 now_ms: 3_401,
4617 }),
4618 StreamResponse::Error {
4619 code: StreamErrorCode::StreamNotFound,
4620 ..
4621 }
4622 ));
4623 }
4624
4625 #[test]
4626 fn stream_expires_at_is_absolute_and_recreate_after_expiry() {
4627 let mut machine = StreamStateMachine::new();
4628 create_bucket(&mut machine);
4629 let stream_id = stream("absolute-expiry");
4630
4631 assert!(matches!(
4632 machine.apply(StreamCommand::CreateStream {
4633 stream_id: stream_id.clone(),
4634 content_type: "application/octet-stream".to_owned(),
4635 initial_payload: Vec::new(),
4636 close_after: false,
4637 stream_seq: None,
4638 producer: None,
4639 stream_ttl_seconds: None,
4640 stream_expires_at_ms: Some(2_000),
4641 forked_from: None,
4642 fork_offset: None,
4643 now_ms: 1_000,
4644 }),
4645 StreamResponse::Created { .. }
4646 ));
4647 assert_eq!(
4648 machine.apply(StreamCommand::TouchStreamAccess {
4649 stream_id: stream_id.clone(),
4650 now_ms: 1_500,
4651 renew_ttl: true,
4652 }),
4653 StreamResponse::Accessed {
4654 changed: false,
4655 expired: false,
4656 }
4657 );
4658 assert!(matches!(
4659 machine.apply(StreamCommand::Append {
4660 stream_id: stream_id.clone(),
4661 content_type: Some("application/octet-stream".to_owned()),
4662 payload: b"body".to_vec(),
4663 close_after: false,
4664 stream_seq: None,
4665 producer: None,
4666 now_ms: 1_600,
4667 }),
4668 StreamResponse::Appended { .. }
4669 ));
4670 assert!(machine.read_plan_at(&stream_id, 0, 16, 1_999).is_ok());
4671 assert!(matches!(
4672 machine.read_plan_at(&stream_id, 0, 16, 2_000),
4673 Err(StreamResponse::Error {
4674 code: StreamErrorCode::StreamNotFound,
4675 ..
4676 })
4677 ));
4678 assert_eq!(
4679 machine.apply(StreamCommand::CreateStream {
4680 stream_id: stream_id.clone(),
4681 content_type: "text/plain".to_owned(),
4682 initial_payload: Vec::new(),
4683 close_after: false,
4684 stream_seq: None,
4685 producer: None,
4686 stream_ttl_seconds: None,
4687 stream_expires_at_ms: None,
4688 forked_from: None,
4689 fork_offset: None,
4690 now_ms: 2_001,
4691 }),
4692 StreamResponse::Created {
4693 stream_id,
4694 next_offset: 0,
4695 closed: false,
4696 }
4697 );
4698 }
4699
4700 #[test]
4701 fn producer_duplicate_final_append_remains_idempotent_after_close() {
4702 let mut machine = StreamStateMachine::new();
4703 create_bucket(&mut machine);
4704 create_stream(&mut machine, "producer-close");
4705
4706 assert_eq!(
4707 machine.apply(StreamCommand::Append {
4708 stream_id: stream("producer-close"),
4709 content_type: Some("application/octet-stream".to_owned()),
4710 payload: b"final".to_vec(),
4711 close_after: true,
4712 stream_seq: None,
4713 producer: Some(producer("writer-1", 0, 0)),
4714 now_ms: 0,
4715 }),
4716 StreamResponse::Appended {
4717 offset: 0,
4718 next_offset: 5,
4719 closed: true,
4720 deduplicated: false,
4721 producer: Some(producer("writer-1", 0, 0)),
4722 }
4723 );
4724 assert_eq!(
4725 machine.apply(StreamCommand::Append {
4726 stream_id: stream("producer-close"),
4727 content_type: Some("application/octet-stream".to_owned()),
4728 payload: b"final".to_vec(),
4729 close_after: true,
4730 stream_seq: None,
4731 producer: Some(producer("writer-1", 0, 0)),
4732 now_ms: 0,
4733 }),
4734 StreamResponse::Appended {
4735 offset: 0,
4736 next_offset: 5,
4737 closed: true,
4738 deduplicated: true,
4739 producer: Some(producer("writer-1", 0, 0)),
4740 }
4741 );
4742 assert!(matches!(
4743 machine.apply(StreamCommand::Append {
4744 stream_id: stream("producer-close"),
4745 content_type: Some("application/octet-stream".to_owned()),
4746 payload: b"too-late".to_vec(),
4747 close_after: false,
4748 stream_seq: None,
4749 producer: Some(producer("writer-1", 0, 1)),
4750 now_ms: 0,
4751 }),
4752 StreamResponse::Error {
4753 code: StreamErrorCode::StreamClosed,
4754 next_offset: Some(5),
4755 ..
4756 }
4757 ));
4758 }
4759
4760 #[test]
4761 fn append_conflict_precedence_reports_closed_before_mismatch_or_seq() {
4762 let mut machine = StreamStateMachine::new();
4763 create_bucket(&mut machine);
4764 create_stream(&mut machine, "closed-precedence");
4765
4766 assert_eq!(
4767 machine.apply(StreamCommand::Append {
4768 stream_id: stream("closed-precedence"),
4769 content_type: Some("application/octet-stream".to_owned()),
4770 payload: b"final".to_vec(),
4771 close_after: true,
4772 stream_seq: Some("0002".to_owned()),
4773 producer: None,
4774 now_ms: 0,
4775 }),
4776 StreamResponse::Appended {
4777 offset: 0,
4778 next_offset: 5,
4779 closed: true,
4780 deduplicated: false,
4781 producer: None,
4782 }
4783 );
4784
4785 assert!(matches!(
4786 machine.apply(StreamCommand::Append {
4787 stream_id: stream("closed-precedence"),
4788 content_type: Some("text/plain".to_owned()),
4789 payload: b"too-late".to_vec(),
4790 close_after: false,
4791 stream_seq: Some("0001".to_owned()),
4792 producer: None,
4793 now_ms: 0,
4794 }),
4795 StreamResponse::Error {
4796 code: StreamErrorCode::StreamClosed,
4797 next_offset: Some(5),
4798 ..
4799 }
4800 ));
4801 }
4802
4803 #[test]
4804 fn bucket_delete_requires_empty_bucket() {
4805 let mut machine = StreamStateMachine::new();
4806 create_bucket(&mut machine);
4807 create_stream(&mut machine, "s-1");
4808
4809 assert!(matches!(
4810 machine.apply(StreamCommand::DeleteBucket {
4811 bucket_id: "benchcmp".to_owned(),
4812 }),
4813 StreamResponse::Error {
4814 code: StreamErrorCode::BucketNotEmpty,
4815 ..
4816 }
4817 ));
4818 assert_eq!(
4819 machine.apply(StreamCommand::DeleteStream {
4820 stream_id: stream("s-1"),
4821 }),
4822 StreamResponse::Deleted {
4823 hard_deleted: true,
4824 parent_to_release: None,
4825 }
4826 );
4827 assert_eq!(
4828 machine.apply(StreamCommand::DeleteBucket {
4829 bucket_id: "benchcmp".to_owned(),
4830 }),
4831 StreamResponse::BucketDeleted {
4832 bucket_id: "benchcmp".to_owned(),
4833 }
4834 );
4835 }
4836
4837 #[test]
4838 fn fork_refs_soft_delete_and_release_parent_on_last_child() {
4839 let mut machine = StreamStateMachine::new();
4840 create_bucket(&mut machine);
4841 create_stream(&mut machine, "source");
4842 create_stream(&mut machine, "fork");
4843
4844 assert_eq!(
4845 machine.apply(StreamCommand::AddForkRef {
4846 stream_id: stream("source"),
4847 now_ms: 0,
4848 }),
4849 StreamResponse::ForkRefAdded { fork_ref_count: 1 }
4850 );
4851 assert_eq!(
4852 machine.apply(StreamCommand::DeleteStream {
4853 stream_id: stream("source"),
4854 }),
4855 StreamResponse::Deleted {
4856 hard_deleted: false,
4857 parent_to_release: None,
4858 }
4859 );
4860 assert!(matches!(
4861 machine.read_plan(&stream("source"), 0, 1),
4862 Err(StreamResponse::Error {
4863 code: StreamErrorCode::StreamGone,
4864 ..
4865 })
4866 ));
4867 assert_eq!(
4868 machine.apply(StreamCommand::DeleteStream {
4869 stream_id: stream("fork"),
4870 }),
4871 StreamResponse::Deleted {
4872 hard_deleted: true,
4873 parent_to_release: None,
4874 }
4875 );
4876 assert_eq!(
4877 machine.apply(StreamCommand::ReleaseForkRef {
4878 stream_id: stream("source"),
4879 }),
4880 StreamResponse::ForkRefReleased {
4881 hard_deleted: true,
4882 fork_ref_count: 0,
4883 parent_to_release: None,
4884 }
4885 );
4886 assert!(machine.head(&stream("source")).is_none());
4887 }
4888
4889 #[test]
4890 fn publish_snapshot_advances_retention_on_message_boundary() {
4891 let mut machine = StreamStateMachine::new();
4892 create_bucket(&mut machine);
4893 create_stream(&mut machine, "snap");
4894 assert!(matches!(
4895 machine.apply(StreamCommand::Append {
4896 stream_id: stream("snap"),
4897 content_type: Some("application/octet-stream".to_owned()),
4898 payload: b"abc".to_vec(),
4899 close_after: false,
4900 stream_seq: None,
4901 producer: None,
4902 now_ms: 0,
4903 }),
4904 StreamResponse::Appended {
4905 offset: 0,
4906 next_offset: 3,
4907 ..
4908 }
4909 ));
4910 assert!(matches!(
4911 machine.apply(StreamCommand::Append {
4912 stream_id: stream("snap"),
4913 content_type: Some("application/octet-stream".to_owned()),
4914 payload: b"de".to_vec(),
4915 close_after: false,
4916 stream_seq: None,
4917 producer: None,
4918 now_ms: 0,
4919 }),
4920 StreamResponse::Appended {
4921 offset: 3,
4922 next_offset: 5,
4923 ..
4924 }
4925 ));
4926
4927 assert_eq!(
4928 machine.apply(StreamCommand::PublishSnapshot {
4929 stream_id: stream("snap"),
4930 snapshot_offset: 3,
4931 content_type: "application/json".to_owned(),
4932 payload: br#"{"state":"abc"}"#.to_vec(),
4933 now_ms: 0,
4934 }),
4935 StreamResponse::SnapshotPublished { snapshot_offset: 3 }
4936 );
4937 assert!(matches!(
4938 machine.read_plan(&stream("snap"), 0, 1),
4939 Err(StreamResponse::Error {
4940 code: StreamErrorCode::StreamGone,
4941 next_offset: Some(3),
4942 ..
4943 })
4944 ));
4945 let read = machine.read(&stream("snap"), 3, 2).expect("retained read");
4946 assert_eq!(read.payload, b"de");
4947 let snapshot = machine
4948 .read_snapshot(&stream("snap"), 3)
4949 .expect("visible snapshot");
4950 assert_eq!(snapshot.content_type, "application/json");
4951 assert_eq!(snapshot.payload, br#"{"state":"abc"}"#);
4952 let bootstrap = machine.bootstrap_plan(&stream("snap")).expect("bootstrap");
4953 assert_eq!(
4954 bootstrap.snapshot.as_ref().map(|snapshot| snapshot.offset),
4955 Some(3)
4956 );
4957 assert_eq!(
4958 bootstrap.updates,
4959 vec![StreamMessageRecord {
4960 start_offset: 3,
4961 end_offset: 5,
4962 }]
4963 );
4964 }
4965
4966 #[test]
4967 fn publish_snapshot_rejects_unaligned_offset() {
4968 let mut machine = StreamStateMachine::new();
4969 create_bucket(&mut machine);
4970 create_stream(&mut machine, "unaligned");
4971 assert!(matches!(
4972 machine.apply(StreamCommand::Append {
4973 stream_id: stream("unaligned"),
4974 content_type: Some("application/octet-stream".to_owned()),
4975 payload: b"abc".to_vec(),
4976 close_after: false,
4977 stream_seq: None,
4978 producer: None,
4979 now_ms: 0,
4980 }),
4981 StreamResponse::Appended { .. }
4982 ));
4983 assert!(matches!(
4984 machine.apply(StreamCommand::PublishSnapshot {
4985 stream_id: stream("unaligned"),
4986 snapshot_offset: 2,
4987 content_type: "application/octet-stream".to_owned(),
4988 payload: b"ab".to_vec(),
4989 now_ms: 0,
4990 }),
4991 StreamResponse::Error {
4992 code: StreamErrorCode::InvalidSnapshot,
4993 next_offset: Some(3),
4994 ..
4995 }
4996 ));
4997 }
4998
4999 #[test]
5000 fn snapshot_restore_preserves_visible_snapshot_and_message_records() {
5001 let mut machine = StreamStateMachine::new();
5002 create_bucket(&mut machine);
5003 create_stream(&mut machine, "restore-snap");
5004 let _ = machine.apply(StreamCommand::Append {
5005 stream_id: stream("restore-snap"),
5006 content_type: Some("application/octet-stream".to_owned()),
5007 payload: b"abc".to_vec(),
5008 close_after: false,
5009 stream_seq: None,
5010 producer: None,
5011 now_ms: 0,
5012 });
5013 let _ = machine.apply(StreamCommand::Append {
5014 stream_id: stream("restore-snap"),
5015 content_type: Some("application/octet-stream".to_owned()),
5016 payload: b"de".to_vec(),
5017 close_after: false,
5018 stream_seq: None,
5019 producer: None,
5020 now_ms: 0,
5021 });
5022 let _ = machine.apply(StreamCommand::PublishSnapshot {
5023 stream_id: stream("restore-snap"),
5024 snapshot_offset: 3,
5025 content_type: "application/octet-stream".to_owned(),
5026 payload: b"abc-state".to_vec(),
5027 now_ms: 0,
5028 });
5029
5030 let restored = StreamStateMachine::restore(machine.snapshot()).expect("restore");
5031 assert_eq!(
5032 restored
5033 .read_snapshot(&stream("restore-snap"), 3)
5034 .expect("snapshot")
5035 .payload,
5036 b"abc-state"
5037 );
5038 assert_eq!(
5039 restored
5040 .bootstrap_plan(&stream("restore-snap"))
5041 .expect("bootstrap")
5042 .updates,
5043 vec![StreamMessageRecord {
5044 start_offset: 3,
5045 end_offset: 5,
5046 }]
5047 );
5048 }
5049}