Skip to main content

ursula_stream/
lib.rs

1use std::collections::{HashMap, HashSet};
2
3use serde::{Deserialize, Serialize};
4use ursula_proto::{ColdChunkRefV1, ExternalPayloadRefV1, ProducerRequestV1};
5use ursula_shard::BucketStreamId;
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8pub enum StreamCommand {
9    CreateBucket {
10        bucket_id: String,
11    },
12    DeleteBucket {
13        bucket_id: String,
14    },
15    CreateStream {
16        stream_id: BucketStreamId,
17        content_type: String,
18        initial_payload: Vec<u8>,
19        close_after: bool,
20        stream_seq: Option<String>,
21        producer: Option<ProducerRequest>,
22        stream_ttl_seconds: Option<u64>,
23        stream_expires_at_ms: Option<u64>,
24        forked_from: Option<BucketStreamId>,
25        fork_offset: Option<u64>,
26        now_ms: u64,
27    },
28    CreateExternal {
29        stream_id: BucketStreamId,
30        content_type: String,
31        initial_payload: ExternalPayloadRef,
32        close_after: bool,
33        stream_seq: Option<String>,
34        producer: Option<ProducerRequest>,
35        stream_ttl_seconds: Option<u64>,
36        stream_expires_at_ms: Option<u64>,
37        forked_from: Option<BucketStreamId>,
38        fork_offset: Option<u64>,
39        now_ms: u64,
40    },
41    Append {
42        stream_id: BucketStreamId,
43        content_type: Option<String>,
44        payload: Vec<u8>,
45        close_after: bool,
46        stream_seq: Option<String>,
47        producer: Option<ProducerRequest>,
48        now_ms: u64,
49    },
50    AppendExternal {
51        stream_id: BucketStreamId,
52        content_type: Option<String>,
53        payload: ExternalPayloadRef,
54        close_after: bool,
55        stream_seq: Option<String>,
56        producer: Option<ProducerRequest>,
57        now_ms: u64,
58    },
59    AppendBatch {
60        stream_id: BucketStreamId,
61        content_type: Option<String>,
62        payloads: Vec<Vec<u8>>,
63        producer: Option<ProducerRequest>,
64        now_ms: u64,
65    },
66    PublishSnapshot {
67        stream_id: BucketStreamId,
68        snapshot_offset: u64,
69        content_type: String,
70        payload: Vec<u8>,
71        now_ms: u64,
72    },
73    TouchStreamAccess {
74        stream_id: BucketStreamId,
75        now_ms: u64,
76        renew_ttl: bool,
77    },
78    AddForkRef {
79        stream_id: BucketStreamId,
80        now_ms: u64,
81    },
82    ReleaseForkRef {
83        stream_id: BucketStreamId,
84    },
85    FlushCold {
86        stream_id: BucketStreamId,
87        chunk: ColdChunkRef,
88    },
89    Close {
90        stream_id: BucketStreamId,
91        stream_seq: Option<String>,
92        producer: Option<ProducerRequest>,
93        now_ms: u64,
94    },
95    DeleteStream {
96        stream_id: BucketStreamId,
97    },
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum StreamResponse {
102    BucketCreated {
103        bucket_id: String,
104    },
105    BucketAlreadyExists {
106        bucket_id: String,
107    },
108    BucketDeleted {
109        bucket_id: String,
110    },
111    Created {
112        stream_id: BucketStreamId,
113        next_offset: u64,
114        closed: bool,
115    },
116    AlreadyExists {
117        next_offset: u64,
118        closed: bool,
119        content_type: String,
120        stream_ttl_seconds: Option<u64>,
121        stream_expires_at_ms: Option<u64>,
122    },
123    Appended {
124        offset: u64,
125        next_offset: u64,
126        closed: bool,
127        deduplicated: bool,
128        producer: Option<ProducerRequest>,
129    },
130    Closed {
131        next_offset: u64,
132        deduplicated: bool,
133        producer: Option<ProducerRequest>,
134    },
135    Deleted {
136        hard_deleted: bool,
137        parent_to_release: Option<BucketStreamId>,
138    },
139    ForkRefAdded {
140        fork_ref_count: u64,
141    },
142    ForkRefReleased {
143        hard_deleted: bool,
144        fork_ref_count: u64,
145        parent_to_release: Option<BucketStreamId>,
146    },
147    ColdFlushed {
148        hot_start_offset: u64,
149    },
150    SnapshotPublished {
151        snapshot_offset: u64,
152    },
153    Accessed {
154        changed: bool,
155        expired: bool,
156    },
157    Error {
158        code: StreamErrorCode,
159        message: String,
160        next_offset: Option<u64>,
161    },
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165pub enum StreamErrorCode {
166    InvalidBucketId,
167    InvalidStreamId,
168    BucketNotFound,
169    BucketNotEmpty,
170    StreamNotFound,
171    StreamGone,
172    StreamAlreadyExistsConflict,
173    MissingContentType,
174    ContentTypeMismatch,
175    EmptyAppend,
176    StreamClosed,
177    StreamSeqConflict,
178    InvalidProducer,
179    ProducerEpochStale,
180    ProducerSeqConflict,
181    InvalidRetention,
182    InvalidFork,
183    OffsetOutOfRange,
184    InvalidColdFlush,
185    InvalidSnapshot,
186    SnapshotNotFound,
187    SnapshotConflict,
188}
189
190impl StreamResponse {
191    fn error(code: StreamErrorCode, message: impl Into<String>) -> Self {
192        Self::Error {
193            code,
194            message: message.into(),
195            next_offset: None,
196        }
197    }
198
199    fn error_with_next_offset(
200        code: StreamErrorCode,
201        message: impl Into<String>,
202        next_offset: u64,
203    ) -> Self {
204        Self::Error {
205            code,
206            message: message.into(),
207            next_offset: Some(next_offset),
208        }
209    }
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
213pub enum StreamStatus {
214    Open,
215    Closed,
216    SoftDeleted,
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
220pub struct StreamMetadata {
221    pub stream_id: BucketStreamId,
222    pub content_type: String,
223    pub status: StreamStatus,
224    pub tail_offset: u64,
225    pub last_stream_seq: Option<String>,
226    pub stream_ttl_seconds: Option<u64>,
227    pub stream_expires_at_ms: Option<u64>,
228    pub created_at_ms: u64,
229    pub last_ttl_touch_at_ms: u64,
230    pub forked_from: Option<BucketStreamId>,
231    pub fork_offset: Option<u64>,
232    pub fork_ref_count: u64,
233}
234
235pub type ProducerRequest = ProducerRequestV1;
236
237#[derive(Debug)]
238pub struct AppendStreamInput<'a> {
239    pub stream_id: BucketStreamId,
240    pub content_type: Option<&'a str>,
241    pub payload: &'a [u8],
242    pub close_after: bool,
243    pub stream_seq: Option<String>,
244    pub producer: Option<ProducerRequest>,
245    pub now_ms: u64,
246}
247
248#[derive(Debug)]
249struct AppendExternalInput<'a> {
250    stream_id: BucketStreamId,
251    content_type: Option<&'a str>,
252    payload: ExternalPayloadRef,
253    close_after: bool,
254    stream_seq: Option<String>,
255    producer: Option<ProducerRequest>,
256    now_ms: u64,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
260pub struct ProducerSnapshot {
261    pub producer_id: String,
262    pub producer_epoch: u64,
263    pub producer_seq: u64,
264    pub last_start_offset: u64,
265    pub last_next_offset: u64,
266    pub last_closed: bool,
267    pub last_items: Vec<ProducerAppendRecord>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271pub struct ProducerAppendRecord {
272    pub start_offset: u64,
273    pub next_offset: u64,
274    pub closed: bool,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq)]
278struct ProducerState {
279    producer_epoch: u64,
280    producer_seq: u64,
281    last_start_offset: u64,
282    last_next_offset: u64,
283    last_closed: bool,
284    last_items: Vec<ProducerAppendRecord>,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct StreamBatchAppend {
289    pub items: Vec<StreamBatchAppendItem>,
290    pub deduplicated: bool,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct StreamBatchAppendItem {
295    pub offset: u64,
296    pub next_offset: u64,
297    pub closed: bool,
298    pub deduplicated: bool,
299}
300
301#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct StreamRead {
303    pub offset: u64,
304    pub next_offset: u64,
305    pub content_type: String,
306    pub payload: Vec<u8>,
307    pub up_to_date: bool,
308    pub closed: bool,
309}
310
311pub type ColdChunkRef = ColdChunkRefV1;
312
313#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
314pub struct ObjectPayloadRef {
315    pub start_offset: u64,
316    pub end_offset: u64,
317    pub s3_path: String,
318    pub object_size: u64,
319}
320
321impl From<&ColdChunkRef> for ObjectPayloadRef {
322    fn from(chunk: &ColdChunkRef) -> Self {
323        Self {
324            start_offset: chunk.start_offset,
325            end_offset: chunk.end_offset,
326            s3_path: chunk.s3_path.clone(),
327            object_size: chunk.object_size,
328        }
329    }
330}
331
332pub type ExternalPayloadRef = ExternalPayloadRefV1;
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335pub struct HotPayloadSegment {
336    pub start_offset: u64,
337    pub end_offset: u64,
338    pub payload_start: usize,
339    pub payload_end: usize,
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct ColdFlushCandidate {
344    pub stream_id: BucketStreamId,
345    pub start_offset: u64,
346    pub end_offset: u64,
347    pub payload: Vec<u8>,
348}
349
350#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct StreamReadColdSegment {
352    pub chunk: ColdChunkRef,
353    pub read_start_offset: u64,
354    pub len: usize,
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct StreamReadObjectSegment {
359    pub object: ObjectPayloadRef,
360    pub read_start_offset: u64,
361    pub len: usize,
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub enum StreamReadSegment {
366    Object(StreamReadObjectSegment),
367    Hot(Vec<u8>),
368}
369
370#[derive(Debug, Clone, PartialEq, Eq)]
371pub struct StreamReadPlan {
372    pub offset: u64,
373    pub next_offset: u64,
374    pub content_type: String,
375    pub segments: Vec<StreamReadSegment>,
376    pub up_to_date: bool,
377    pub closed: bool,
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
381pub struct StreamMessageRecord {
382    pub start_offset: u64,
383    pub end_offset: u64,
384}
385
386#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
387pub struct StreamVisibleSnapshot {
388    pub offset: u64,
389    pub content_type: String,
390    pub payload: Vec<u8>,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct StreamBootstrapPlan {
395    pub snapshot: Option<StreamVisibleSnapshot>,
396    pub updates: Vec<StreamMessageRecord>,
397    pub next_offset: u64,
398    pub content_type: String,
399    pub up_to_date: bool,
400    pub closed: bool,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404pub struct StreamSnapshot {
405    pub buckets: Vec<String>,
406    pub streams: Vec<StreamSnapshotEntry>,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
410pub struct StreamSnapshotEntry {
411    pub metadata: StreamMetadata,
412    pub hot_start_offset: u64,
413    pub payload: Vec<u8>,
414    pub hot_segments: Vec<HotPayloadSegment>,
415    pub cold_chunks: Vec<ColdChunkRef>,
416    pub external_segments: Vec<ObjectPayloadRef>,
417    pub message_records: Vec<StreamMessageRecord>,
418    pub visible_snapshot: Option<StreamVisibleSnapshot>,
419    pub producer_states: Vec<ProducerSnapshot>,
420}
421
422#[derive(Debug, Clone, PartialEq, Eq)]
423pub enum StreamSnapshotError {
424    DuplicateBucket(String),
425    DuplicateStream(BucketStreamId),
426    DuplicateProducer {
427        stream_id: BucketStreamId,
428        producer_id: String,
429    },
430    MissingBucket(BucketStreamId),
431    PayloadLengthMismatch {
432        stream_id: BucketStreamId,
433        tail_offset: u64,
434        payload_len: usize,
435    },
436    MessageBoundaryMismatch {
437        stream_id: BucketStreamId,
438    },
439    SnapshotOffsetOutOfRange {
440        stream_id: BucketStreamId,
441        snapshot_offset: u64,
442        tail_offset: u64,
443    },
444}
445
446impl std::fmt::Display for StreamSnapshotError {
447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448        match self {
449            Self::DuplicateBucket(bucket_id) => {
450                write!(f, "snapshot contains duplicate bucket '{bucket_id}'")
451            }
452            Self::DuplicateStream(stream_id) => {
453                write!(f, "snapshot contains duplicate stream '{stream_id}'")
454            }
455            Self::DuplicateProducer {
456                stream_id,
457                producer_id,
458            } => write!(
459                f,
460                "snapshot stream '{stream_id}' contains duplicate producer '{producer_id}'"
461            ),
462            Self::MissingBucket(stream_id) => {
463                write!(
464                    f,
465                    "snapshot stream '{stream_id}' references a missing bucket"
466                )
467            }
468            Self::PayloadLengthMismatch {
469                stream_id,
470                tail_offset,
471                payload_len,
472            } => write!(
473                f,
474                "snapshot stream '{stream_id}' tail offset {tail_offset} does not match payload length {payload_len}"
475            ),
476            Self::MessageBoundaryMismatch { stream_id } => write!(
477                f,
478                "snapshot stream '{stream_id}' has inconsistent message boundaries"
479            ),
480            Self::SnapshotOffsetOutOfRange {
481                stream_id,
482                snapshot_offset,
483                tail_offset,
484            } => write!(
485                f,
486                "snapshot stream '{stream_id}' visible snapshot offset {snapshot_offset} is beyond tail offset {tail_offset}"
487            ),
488        }
489    }
490}
491
492impl std::error::Error for StreamSnapshotError {}
493
494#[derive(Debug, Clone, Default)]
495pub struct StreamStateMachine {
496    buckets: HashSet<String>,
497    streams: HashMap<BucketStreamId, StreamMetadata>,
498    payloads: HashMap<BucketStreamId, Vec<u8>>,
499    hot_segments: HashMap<BucketStreamId, Vec<HotPayloadSegment>>,
500    hot_start_offsets: HashMap<BucketStreamId, u64>,
501    cold_chunks: HashMap<BucketStreamId, Vec<ColdChunkRef>>,
502    external_segments: HashMap<BucketStreamId, Vec<ObjectPayloadRef>>,
503    message_records: HashMap<BucketStreamId, Vec<StreamMessageRecord>>,
504    visible_snapshots: HashMap<BucketStreamId, StreamVisibleSnapshot>,
505    producers: HashMap<BucketStreamId, HashMap<String, ProducerState>>,
506}
507
508impl StreamStateMachine {
509    pub fn new() -> Self {
510        Self::default()
511    }
512
513    pub fn apply(&mut self, command: StreamCommand) -> StreamResponse {
514        match command {
515            StreamCommand::CreateBucket { bucket_id } => self.create_bucket(bucket_id),
516            StreamCommand::DeleteBucket { bucket_id } => self.delete_bucket(&bucket_id),
517            StreamCommand::CreateStream {
518                stream_id,
519                content_type,
520                initial_payload,
521                close_after,
522                stream_seq,
523                producer,
524                stream_ttl_seconds,
525                stream_expires_at_ms,
526                forked_from,
527                fork_offset,
528                now_ms,
529            } => self.create_stream(CreateStreamInput {
530                stream_id,
531                content_type,
532                initial_payload,
533                close_after,
534                stream_seq,
535                producer,
536                stream_ttl_seconds,
537                stream_expires_at_ms,
538                forked_from,
539                fork_offset,
540                now_ms,
541            }),
542            StreamCommand::CreateExternal {
543                stream_id,
544                content_type,
545                initial_payload,
546                close_after,
547                stream_seq,
548                producer,
549                stream_ttl_seconds,
550                stream_expires_at_ms,
551                forked_from,
552                fork_offset,
553                now_ms,
554            } => self.create_external_stream(CreateExternalStreamInput {
555                stream_id,
556                content_type,
557                initial_payload,
558                close_after,
559                stream_seq,
560                producer,
561                stream_ttl_seconds,
562                stream_expires_at_ms,
563                forked_from,
564                fork_offset,
565                now_ms,
566            }),
567            StreamCommand::Append {
568                stream_id,
569                content_type,
570                payload,
571                close_after,
572                stream_seq,
573                producer,
574                now_ms,
575            } => self.append_borrowed(AppendStreamInput {
576                stream_id,
577                content_type: content_type.as_deref(),
578                payload: &payload,
579                close_after,
580                stream_seq,
581                producer,
582                now_ms,
583            }),
584            StreamCommand::AppendExternal {
585                stream_id,
586                content_type,
587                payload,
588                close_after,
589                stream_seq,
590                producer,
591                now_ms,
592            } => self.append_external(AppendExternalInput {
593                stream_id,
594                content_type: content_type.as_deref(),
595                payload,
596                close_after,
597                stream_seq,
598                producer,
599                now_ms,
600            }),
601            StreamCommand::AppendBatch {
602                stream_id,
603                content_type,
604                payloads,
605                producer,
606                now_ms,
607            } => match self.append_batch_borrowed(
608                stream_id,
609                content_type.as_deref(),
610                &payloads.iter().map(Vec::as_slice).collect::<Vec<_>>(),
611                producer,
612                now_ms,
613            ) {
614                Ok(batch) => batch
615                    .items
616                    .last()
617                    .map(|item| StreamResponse::Appended {
618                        offset: item.offset,
619                        next_offset: item.next_offset,
620                        closed: item.closed,
621                        deduplicated: item.deduplicated,
622                        producer: None,
623                    })
624                    .unwrap_or_else(|| {
625                        StreamResponse::error(
626                            StreamErrorCode::EmptyAppend,
627                            "append batch must contain at least one payload",
628                        )
629                    }),
630                Err(response) => response,
631            },
632            StreamCommand::PublishSnapshot {
633                stream_id,
634                snapshot_offset,
635                content_type,
636                payload,
637                now_ms,
638            } => self.publish_snapshot(stream_id, snapshot_offset, content_type, payload, now_ms),
639            StreamCommand::TouchStreamAccess {
640                stream_id,
641                now_ms,
642                renew_ttl,
643            } => self.touch_stream_access(&stream_id, now_ms, renew_ttl),
644            StreamCommand::AddForkRef { stream_id, now_ms } => {
645                self.add_fork_ref(&stream_id, now_ms)
646            }
647            StreamCommand::ReleaseForkRef { stream_id } => self.release_fork_ref(&stream_id),
648            StreamCommand::FlushCold { stream_id, chunk } => self.flush_cold(stream_id, chunk),
649            StreamCommand::Close {
650                stream_id,
651                stream_seq,
652                producer,
653                now_ms,
654            } => self.close(stream_id, stream_seq, producer, now_ms),
655            StreamCommand::DeleteStream { stream_id } => self.delete_stream(&stream_id),
656        }
657    }
658
659    pub fn head(&self, stream_id: &BucketStreamId) -> Option<&StreamMetadata> {
660        self.streams.get(stream_id)
661    }
662
663    pub fn head_at(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> Option<&StreamMetadata> {
664        self.expire_stream_if_due(stream_id, now_ms);
665        self.streams.get(stream_id)
666    }
667
668    pub fn access_requires_write(
669        &self,
670        stream_id: &BucketStreamId,
671        now_ms: u64,
672        renew_ttl: bool,
673    ) -> Result<bool, StreamResponse> {
674        self.validate_stream_scope(stream_id)?;
675        let Some(stream) = self.streams.get(stream_id) else {
676            return Err(StreamResponse::error(
677                StreamErrorCode::StreamNotFound,
678                format!("stream '{stream_id}' does not exist"),
679            ));
680        };
681        if is_soft_deleted(stream) {
682            return Err(StreamResponse::error(
683                StreamErrorCode::StreamGone,
684                format!("stream '{stream_id}' is gone"),
685            ));
686        }
687        if stream_is_expired(stream, now_ms) {
688            return Ok(true);
689        }
690        Ok(renew_ttl
691            && stream.stream_ttl_seconds.is_some()
692            && stream.last_ttl_touch_at_ms != now_ms)
693    }
694
695    pub fn hot_start_offset(&self, stream_id: &BucketStreamId) -> u64 {
696        self.hot_start_offsets.get(stream_id).copied().unwrap_or(0)
697    }
698
699    pub fn cold_chunks(&self, stream_id: &BucketStreamId) -> &[ColdChunkRef] {
700        self.cold_chunks
701            .get(stream_id)
702            .map(Vec::as_slice)
703            .unwrap_or(&[])
704    }
705
706    pub fn external_segments(&self, stream_id: &BucketStreamId) -> &[ObjectPayloadRef] {
707        self.external_segments
708            .get(stream_id)
709            .map(Vec::as_slice)
710            .unwrap_or(&[])
711    }
712
713    pub fn hot_segments(&self, stream_id: &BucketStreamId) -> &[HotPayloadSegment] {
714        self.hot_segments
715            .get(stream_id)
716            .map(Vec::as_slice)
717            .unwrap_or(&[])
718    }
719
720    pub fn hot_payload_len(&self, stream_id: &BucketStreamId) -> Result<u64, StreamResponse> {
721        let Some(stream) = self.streams.get(stream_id) else {
722            return Err(StreamResponse::error(
723                StreamErrorCode::StreamNotFound,
724                format!("stream '{stream_id}' does not exist"),
725            ));
726        };
727        if is_soft_deleted(stream) {
728            return Err(StreamResponse::error(
729                StreamErrorCode::StreamGone,
730                format!("stream '{stream_id}' is gone"),
731            ));
732        }
733        let payload = self
734            .payloads
735            .get(stream_id)
736            .expect("payload vector exists for stream metadata");
737        Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
738    }
739
740    pub fn total_hot_payload_bytes(&self) -> u64 {
741        self.payloads
742            .values()
743            .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
744            .sum()
745    }
746
747    pub fn plan_cold_flush(
748        &self,
749        stream_id: &BucketStreamId,
750        min_hot_bytes: usize,
751        max_flush_bytes: usize,
752    ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
753        if max_flush_bytes == 0 {
754            return Ok(None);
755        }
756        let Some(stream) = self.streams.get(stream_id) else {
757            return Err(StreamResponse::error(
758                StreamErrorCode::StreamNotFound,
759                format!("stream '{stream_id}' does not exist"),
760            ));
761        };
762        if is_soft_deleted(stream) {
763            return Err(StreamResponse::error(
764                StreamErrorCode::StreamGone,
765                format!("stream '{stream_id}' is gone"),
766            ));
767        }
768        let Some(first_segment) = self.hot_segments(stream_id).first() else {
769            return Ok(None);
770        };
771        let mut payload_end = first_segment.payload_start;
772        let mut end_offset = first_segment.start_offset;
773        let mut flush_len = 0usize;
774        for segment in self.hot_segments(stream_id) {
775            if segment.start_offset != end_offset || segment.payload_start != payload_end {
776                break;
777            }
778            let remaining = max_flush_bytes.saturating_sub(flush_len);
779            if remaining == 0 {
780                break;
781            }
782            let segment_len = segment.payload_end - segment.payload_start;
783            let take = segment_len.min(remaining);
784            flush_len += take;
785            payload_end += take;
786            end_offset = end_offset.saturating_add(u64::try_from(take).expect("take fits u64"));
787            if take < segment_len {
788                break;
789            }
790        }
791        if flush_len < min_hot_bytes {
792            return Ok(None);
793        }
794        let payload = self
795            .payloads
796            .get(stream_id)
797            .expect("payload vector exists for stream metadata");
798        let start_offset = first_segment.start_offset;
799        Ok(Some(ColdFlushCandidate {
800            stream_id: stream_id.clone(),
801            start_offset,
802            end_offset,
803            payload: payload[first_segment.payload_start..payload_end].to_vec(),
804        }))
805    }
806
807    pub fn plan_next_cold_flush(
808        &self,
809        min_hot_bytes: usize,
810        max_flush_bytes: usize,
811    ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
812        if max_flush_bytes == 0 {
813            return Ok(None);
814        }
815        let mut stream_ids = self.streams.keys().cloned().collect::<Vec<_>>();
816        stream_ids.sort_by(compare_stream_ids);
817        for stream_id in stream_ids {
818            match self.plan_cold_flush(&stream_id, min_hot_bytes, max_flush_bytes) {
819                Ok(Some(candidate)) => return Ok(Some(candidate)),
820                Ok(None) => {}
821                Err(StreamResponse::Error {
822                    code: StreamErrorCode::StreamGone | StreamErrorCode::StreamNotFound,
823                    ..
824                }) => {}
825                Err(err) => return Err(err),
826            }
827        }
828        Ok(None)
829    }
830
831    pub fn plan_next_cold_flush_batch(
832        &self,
833        min_hot_bytes: usize,
834        max_flush_bytes: usize,
835        max_candidates: usize,
836    ) -> Result<Vec<ColdFlushCandidate>, StreamResponse> {
837        if max_candidates == 0 || max_flush_bytes == 0 {
838            return Ok(Vec::new());
839        }
840        let mut preview = self.clone();
841        let mut candidates = Vec::with_capacity(max_candidates);
842        while candidates.len() < max_candidates {
843            let Some(candidate) = preview.plan_next_cold_flush(min_hot_bytes, max_flush_bytes)?
844            else {
845                break;
846            };
847            let chunk = ColdChunkRef {
848                start_offset: candidate.start_offset,
849                end_offset: candidate.end_offset,
850                s3_path: "planned-cold-flush-batch".to_owned(),
851                object_size: u64::try_from(candidate.payload.len()).expect("payload len fits u64"),
852            };
853            match preview.flush_cold(candidate.stream_id.clone(), chunk) {
854                StreamResponse::ColdFlushed { .. } => candidates.push(candidate),
855                StreamResponse::Error { .. } => break,
856                other => {
857                    return Err(StreamResponse::error(
858                        StreamErrorCode::InvalidColdFlush,
859                        format!("unexpected cold flush planning response: {other:?}"),
860                    ));
861                }
862            }
863        }
864        Ok(candidates)
865    }
866
867    pub fn bucket_exists(&self, bucket_id: &str) -> bool {
868        self.buckets.contains(bucket_id)
869    }
870
871    pub fn snapshot(&self) -> StreamSnapshot {
872        let mut buckets = self.buckets.iter().cloned().collect::<Vec<_>>();
873        buckets.sort();
874
875        let mut streams = self
876            .streams
877            .values()
878            .cloned()
879            .map(|metadata| {
880                let stream_id = metadata.stream_id.clone();
881                let payload = self
882                    .payloads
883                    .get(&stream_id)
884                    .expect("payload vector exists for stream metadata")
885                    .clone();
886                let producer_states = self.producer_snapshot(&stream_id);
887                StreamSnapshotEntry {
888                    metadata,
889                    hot_start_offset: self.hot_start_offset(&stream_id),
890                    payload,
891                    hot_segments: self
892                        .hot_segments
893                        .get(&stream_id)
894                        .cloned()
895                        .unwrap_or_default(),
896                    cold_chunks: self
897                        .cold_chunks
898                        .get(&stream_id)
899                        .cloned()
900                        .unwrap_or_default(),
901                    external_segments: self
902                        .external_segments
903                        .get(&stream_id)
904                        .cloned()
905                        .unwrap_or_default(),
906                    message_records: self
907                        .message_records
908                        .get(&stream_id)
909                        .cloned()
910                        .unwrap_or_default(),
911                    visible_snapshot: self.visible_snapshots.get(&stream_id).cloned(),
912                    producer_states,
913                }
914            })
915            .collect::<Vec<_>>();
916        streams.sort_by(|left, right| {
917            compare_stream_ids(&left.metadata.stream_id, &right.metadata.stream_id)
918        });
919
920        StreamSnapshot { buckets, streams }
921    }
922
923    pub fn restore(snapshot: StreamSnapshot) -> Result<Self, StreamSnapshotError> {
924        let mut machine = Self::default();
925        for bucket_id in snapshot.buckets {
926            if !machine.buckets.insert(bucket_id.clone()) {
927                return Err(StreamSnapshotError::DuplicateBucket(bucket_id));
928            }
929        }
930
931        for entry in snapshot.streams {
932            let stream_id = entry.metadata.stream_id.clone();
933            if !machine.buckets.contains(&stream_id.bucket_id) {
934                return Err(StreamSnapshotError::MissingBucket(stream_id));
935            }
936            if let Some(snapshot) = entry.visible_snapshot.as_ref()
937                && snapshot.offset > entry.metadata.tail_offset
938            {
939                return Err(StreamSnapshotError::SnapshotOffsetOutOfRange {
940                    stream_id,
941                    snapshot_offset: snapshot.offset,
942                    tail_offset: entry.metadata.tail_offset,
943                });
944            }
945            let retained_offset = entry
946                .visible_snapshot
947                .as_ref()
948                .map(|snapshot| snapshot.offset)
949                .unwrap_or(0);
950            let hot_segments = if entry.hot_segments.is_empty() && !entry.payload.is_empty() {
951                vec![HotPayloadSegment {
952                    start_offset: entry.hot_start_offset,
953                    end_offset: entry.metadata.tail_offset,
954                    payload_start: 0,
955                    payload_end: entry.payload.len(),
956                }]
957            } else {
958                entry.hot_segments
959            };
960            if !hot_segments_match_payload(&hot_segments, entry.payload.len())
961                || !payload_sources_cover_retained_suffix(
962                    &entry.cold_chunks,
963                    &entry.external_segments,
964                    &hot_segments,
965                    retained_offset,
966                    entry.metadata.tail_offset,
967                )
968            {
969                return Err(StreamSnapshotError::PayloadLengthMismatch {
970                    stream_id,
971                    tail_offset: entry.metadata.tail_offset,
972                    payload_len: entry.payload.len(),
973                });
974            }
975            if !message_records_cover_retained_suffix(
976                &entry.message_records,
977                retained_offset,
978                entry.metadata.tail_offset,
979            ) {
980                return Err(StreamSnapshotError::MessageBoundaryMismatch { stream_id });
981            }
982            if machine
983                .streams
984                .insert(entry.metadata.stream_id.clone(), entry.metadata)
985                .is_some()
986            {
987                return Err(StreamSnapshotError::DuplicateStream(stream_id));
988            }
989            let producer_states = restore_producer_states(&stream_id, entry.producer_states)?;
990            if !hot_segments.is_empty() {
991                machine.hot_segments.insert(stream_id.clone(), hot_segments);
992            }
993            if !entry.cold_chunks.is_empty() {
994                machine
995                    .cold_chunks
996                    .insert(stream_id.clone(), entry.cold_chunks);
997            }
998            if !entry.external_segments.is_empty() {
999                machine
1000                    .external_segments
1001                    .insert(stream_id.clone(), entry.external_segments);
1002            }
1003            if !entry.message_records.is_empty() {
1004                machine
1005                    .message_records
1006                    .insert(stream_id.clone(), entry.message_records);
1007            }
1008            if let Some(snapshot) = entry.visible_snapshot {
1009                machine
1010                    .visible_snapshots
1011                    .insert(stream_id.clone(), snapshot);
1012            }
1013            machine.payloads.insert(stream_id.clone(), entry.payload);
1014            machine.producers.insert(stream_id.clone(), producer_states);
1015            machine.refresh_hot_start_offset(&stream_id);
1016        }
1017
1018        Ok(machine)
1019    }
1020
1021    pub fn read(
1022        &self,
1023        stream_id: &BucketStreamId,
1024        offset: u64,
1025        max_len: usize,
1026    ) -> Result<StreamRead, StreamResponse> {
1027        let plan = self.read_plan(stream_id, offset, max_len)?;
1028        if plan
1029            .segments
1030            .iter()
1031            .any(|segment| matches!(segment, StreamReadSegment::Object(_)))
1032        {
1033            return Err(StreamResponse::error_with_next_offset(
1034                StreamErrorCode::InvalidColdFlush,
1035                format!("stream '{stream_id}' read requires object payload store"),
1036                plan.next_offset,
1037            ));
1038        }
1039        let payload = plan
1040            .segments
1041            .iter()
1042            .flat_map(|segment| match segment {
1043                StreamReadSegment::Hot(payload) => payload.as_slice(),
1044                StreamReadSegment::Object(_) => unreachable!("object segments checked above"),
1045            })
1046            .copied()
1047            .collect();
1048        Ok(StreamRead {
1049            offset: plan.offset,
1050            next_offset: plan.next_offset,
1051            content_type: plan.content_type,
1052            payload,
1053            up_to_date: plan.up_to_date,
1054            closed: plan.closed,
1055        })
1056    }
1057
1058    pub fn read_plan(
1059        &self,
1060        stream_id: &BucketStreamId,
1061        offset: u64,
1062        max_len: usize,
1063    ) -> Result<StreamReadPlan, StreamResponse> {
1064        self.read_plan_at(stream_id, offset, max_len, 0)
1065    }
1066
1067    pub fn read_plan_at(
1068        &self,
1069        stream_id: &BucketStreamId,
1070        offset: u64,
1071        max_len: usize,
1072        now_ms: u64,
1073    ) -> Result<StreamReadPlan, StreamResponse> {
1074        let Some(stream) = self.streams.get(stream_id) else {
1075            return Err(StreamResponse::error(
1076                StreamErrorCode::StreamNotFound,
1077                format!("stream '{stream_id}' does not exist"),
1078            ));
1079        };
1080        if is_soft_deleted(stream) {
1081            return Err(StreamResponse::error(
1082                StreamErrorCode::StreamGone,
1083                format!("stream '{stream_id}' is gone"),
1084            ));
1085        }
1086        if stream_is_expired(stream, now_ms) {
1087            return Err(StreamResponse::error(
1088                StreamErrorCode::StreamNotFound,
1089                format!("stream '{stream_id}' does not exist"),
1090            ));
1091        }
1092        if offset > stream.tail_offset {
1093            return Err(StreamResponse::error_with_next_offset(
1094                StreamErrorCode::OffsetOutOfRange,
1095                format!(
1096                    "offset {offset} is beyond stream '{}' tail {}",
1097                    stream_id, stream.tail_offset
1098                ),
1099                stream.tail_offset,
1100            ));
1101        }
1102        let retained_offset = self.earliest_retained_offset(stream_id);
1103        if offset < retained_offset {
1104            return Err(StreamResponse::error_with_next_offset(
1105                StreamErrorCode::StreamGone,
1106                format!(
1107                    "offset {offset} is older than stream '{}' retained offset {retained_offset}",
1108                    stream_id
1109                ),
1110                retained_offset,
1111            ));
1112        }
1113
1114        let max_len_u64 = u64::try_from(max_len).unwrap_or(u64::MAX);
1115        let next_offset = stream.tail_offset.min(offset.saturating_add(max_len_u64));
1116        let payload = self
1117            .payloads
1118            .get(stream_id)
1119            .expect("payload vector exists for stream metadata");
1120        let mut segments = Vec::<(u64, StreamReadSegment)>::new();
1121        for chunk in self.cold_chunks(stream_id) {
1122            let start = offset.max(chunk.start_offset);
1123            let end = next_offset.min(chunk.end_offset);
1124            if start < end {
1125                segments.push((
1126                    start,
1127                    StreamReadSegment::Object(StreamReadObjectSegment {
1128                        object: ObjectPayloadRef::from(chunk),
1129                        read_start_offset: start,
1130                        len: usize::try_from(end - start).expect("object read len fits usize"),
1131                    }),
1132                ));
1133            }
1134        }
1135        for object in self.external_segments(stream_id) {
1136            let start = offset.max(object.start_offset);
1137            let end = next_offset.min(object.end_offset);
1138            if start < end {
1139                segments.push((
1140                    start,
1141                    StreamReadSegment::Object(StreamReadObjectSegment {
1142                        object: object.clone(),
1143                        read_start_offset: start,
1144                        len: usize::try_from(end - start).expect("object read len fits usize"),
1145                    }),
1146                ));
1147            }
1148        }
1149        for segment in self.hot_segments(stream_id) {
1150            let start = offset.max(segment.start_offset);
1151            let end = next_offset.min(segment.end_offset);
1152            if start < end {
1153                let payload_start = segment.payload_start
1154                    + usize::try_from(start - segment.start_offset)
1155                        .expect("hot segment start fits usize");
1156                let payload_end = segment.payload_start
1157                    + usize::try_from(end - segment.start_offset)
1158                        .expect("hot segment end fits usize");
1159                segments.push((
1160                    start,
1161                    StreamReadSegment::Hot(payload[payload_start..payload_end].to_vec()),
1162                ));
1163            }
1164        }
1165        segments.sort_by_key(|(start, _)| *start);
1166        if !segments_cover_range(&segments, offset, next_offset) {
1167            return Err(StreamResponse::error_with_next_offset(
1168                StreamErrorCode::InvalidColdFlush,
1169                format!("stream '{stream_id}' has missing payload segment metadata"),
1170                next_offset,
1171            ));
1172        }
1173        Ok(StreamReadPlan {
1174            offset,
1175            next_offset,
1176            content_type: stream.content_type.clone(),
1177            segments: segments.into_iter().map(|(_, segment)| segment).collect(),
1178            up_to_date: next_offset == stream.tail_offset,
1179            closed: stream.status == StreamStatus::Closed,
1180        })
1181    }
1182
1183    pub fn latest_snapshot(
1184        &self,
1185        stream_id: &BucketStreamId,
1186    ) -> Result<Option<StreamVisibleSnapshot>, StreamResponse> {
1187        let Some(stream) = self.streams.get(stream_id) else {
1188            return Err(StreamResponse::error(
1189                StreamErrorCode::StreamNotFound,
1190                format!("stream '{stream_id}' does not exist"),
1191            ));
1192        };
1193        if is_soft_deleted(stream) {
1194            return Err(StreamResponse::error(
1195                StreamErrorCode::StreamGone,
1196                format!("stream '{stream_id}' is gone"),
1197            ));
1198        }
1199        Ok(self.visible_snapshots.get(stream_id).cloned())
1200    }
1201
1202    pub fn read_snapshot(
1203        &self,
1204        stream_id: &BucketStreamId,
1205        snapshot_offset: u64,
1206    ) -> Result<StreamVisibleSnapshot, StreamResponse> {
1207        let snapshot = self.latest_snapshot(stream_id)?;
1208        match snapshot {
1209            Some(snapshot) if snapshot.offset == snapshot_offset => Ok(snapshot),
1210            _ => Err(StreamResponse::error(
1211                StreamErrorCode::SnapshotNotFound,
1212                format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
1213            )),
1214        }
1215    }
1216
1217    pub fn delete_snapshot(
1218        &self,
1219        stream_id: &BucketStreamId,
1220        snapshot_offset: u64,
1221    ) -> StreamResponse {
1222        match self.latest_snapshot(stream_id) {
1223            Ok(Some(snapshot)) if snapshot.offset == snapshot_offset => StreamResponse::error(
1224                StreamErrorCode::SnapshotConflict,
1225                format!(
1226                    "snapshot {snapshot_offset} for stream '{stream_id}' is the latest visible snapshot"
1227                ),
1228            ),
1229            Ok(_) => StreamResponse::error(
1230                StreamErrorCode::SnapshotNotFound,
1231                format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
1232            ),
1233            Err(err) => err,
1234        }
1235    }
1236
1237    pub fn bootstrap_plan(
1238        &self,
1239        stream_id: &BucketStreamId,
1240    ) -> Result<StreamBootstrapPlan, StreamResponse> {
1241        let Some(stream) = self.streams.get(stream_id) else {
1242            return Err(StreamResponse::error(
1243                StreamErrorCode::StreamNotFound,
1244                format!("stream '{stream_id}' does not exist"),
1245            ));
1246        };
1247        if is_soft_deleted(stream) {
1248            return Err(StreamResponse::error(
1249                StreamErrorCode::StreamGone,
1250                format!("stream '{stream_id}' is gone"),
1251            ));
1252        }
1253        let snapshot = self.visible_snapshots.get(stream_id).cloned();
1254        let retained_offset = snapshot
1255            .as_ref()
1256            .map(|snapshot| snapshot.offset)
1257            .unwrap_or(0);
1258        let updates = self
1259            .message_records
1260            .get(stream_id)
1261            .map(|records| {
1262                records
1263                    .iter()
1264                    .filter(|record| record.start_offset >= retained_offset)
1265                    .cloned()
1266                    .collect::<Vec<_>>()
1267            })
1268            .unwrap_or_default();
1269        Ok(StreamBootstrapPlan {
1270            snapshot,
1271            updates,
1272            next_offset: stream.tail_offset,
1273            content_type: stream.content_type.clone(),
1274            up_to_date: true,
1275            closed: stream.status == StreamStatus::Closed,
1276        })
1277    }
1278
1279    fn publish_snapshot(
1280        &mut self,
1281        stream_id: BucketStreamId,
1282        snapshot_offset: u64,
1283        content_type: String,
1284        payload: Vec<u8>,
1285        now_ms: u64,
1286    ) -> StreamResponse {
1287        if let Err(response) = self.validate_stream_scope(&stream_id) {
1288            return response;
1289        }
1290        if content_type.trim().is_empty() {
1291            return StreamResponse::error(
1292                StreamErrorCode::InvalidSnapshot,
1293                "snapshot content type must not be empty",
1294            );
1295        }
1296        let Some(stream) = self.streams.get(&stream_id) else {
1297            return StreamResponse::error(
1298                StreamErrorCode::StreamNotFound,
1299                format!("stream '{stream_id}' does not exist"),
1300            );
1301        };
1302        if is_soft_deleted(stream) {
1303            return StreamResponse::error(
1304                StreamErrorCode::StreamGone,
1305                format!("stream '{stream_id}' is gone"),
1306            );
1307        }
1308        if stream_is_expired(stream, now_ms) {
1309            self.remove_stream_state(&stream_id);
1310            return StreamResponse::error(
1311                StreamErrorCode::StreamNotFound,
1312                format!("stream '{stream_id}' does not exist"),
1313            );
1314        }
1315        let tail_offset = stream.tail_offset;
1316        let retained_offset = self.earliest_retained_offset(&stream_id);
1317        if snapshot_offset < retained_offset {
1318            return StreamResponse::error_with_next_offset(
1319                StreamErrorCode::StreamGone,
1320                format!(
1321                    "snapshot offset {snapshot_offset} is older than stream '{}' retained offset {retained_offset}",
1322                    stream_id
1323                ),
1324                retained_offset,
1325            );
1326        }
1327        if snapshot_offset > tail_offset {
1328            return StreamResponse::error_with_next_offset(
1329                StreamErrorCode::SnapshotConflict,
1330                format!(
1331                    "snapshot offset {snapshot_offset} is beyond stream '{}' tail {tail_offset}",
1332                    stream_id
1333                ),
1334                tail_offset,
1335            );
1336        }
1337        if !self.snapshot_offset_aligned(&stream_id, snapshot_offset, retained_offset) {
1338            return StreamResponse::error_with_next_offset(
1339                StreamErrorCode::InvalidSnapshot,
1340                format!(
1341                    "snapshot offset {snapshot_offset} is not aligned to a committed message boundary for stream '{stream_id}'"
1342                ),
1343                tail_offset,
1344            );
1345        }
1346
1347        self.visible_snapshots.insert(
1348            stream_id.clone(),
1349            StreamVisibleSnapshot {
1350                offset: snapshot_offset,
1351                content_type,
1352                payload,
1353            },
1354        );
1355        self.compact_retained_prefix(&stream_id, snapshot_offset);
1356        StreamResponse::SnapshotPublished { snapshot_offset }
1357    }
1358
1359    fn flush_cold(&mut self, stream_id: BucketStreamId, chunk: ColdChunkRef) -> StreamResponse {
1360        if let Err(response) = self.validate_stream_scope(&stream_id) {
1361            return response;
1362        }
1363        if chunk.s3_path.trim().is_empty() {
1364            return StreamResponse::error(
1365                StreamErrorCode::InvalidColdFlush,
1366                "cold chunk S3 path must not be empty",
1367            );
1368        }
1369        if chunk.object_size == 0 {
1370            return StreamResponse::error(
1371                StreamErrorCode::InvalidColdFlush,
1372                "cold chunk object size must be greater than zero",
1373            );
1374        }
1375        let Some(stream) = self.streams.get(&stream_id) else {
1376            return StreamResponse::error(
1377                StreamErrorCode::StreamNotFound,
1378                format!("stream '{stream_id}' does not exist"),
1379            );
1380        };
1381        if is_soft_deleted(stream) {
1382            return StreamResponse::error(
1383                StreamErrorCode::StreamGone,
1384                format!("stream '{stream_id}' is gone"),
1385            );
1386        }
1387        if chunk.end_offset <= chunk.start_offset {
1388            return StreamResponse::error_with_next_offset(
1389                StreamErrorCode::InvalidColdFlush,
1390                "cold chunk must cover at least one byte",
1391                stream.tail_offset,
1392            );
1393        }
1394        if chunk.end_offset > stream.tail_offset {
1395            return StreamResponse::error_with_next_offset(
1396                StreamErrorCode::InvalidColdFlush,
1397                format!(
1398                    "cold chunk end {} is beyond stream '{}' tail {}",
1399                    chunk.end_offset, stream_id, stream.tail_offset
1400                ),
1401                stream.tail_offset,
1402            );
1403        }
1404        let segments = self.hot_segments(&stream_id);
1405        let Some(segment_index) = segments
1406            .iter()
1407            .position(|segment| segment.start_offset == chunk.start_offset)
1408        else {
1409            return StreamResponse::error_with_next_offset(
1410                StreamErrorCode::InvalidColdFlush,
1411                format!(
1412                    "cold chunk for stream '{stream_id}' does not match the start of a hot payload segment"
1413                ),
1414                stream.tail_offset,
1415            );
1416        };
1417
1418        let drain_start = segments[segment_index].payload_start;
1419        let mut covered_offset = chunk.start_offset;
1420        let mut flush_len = 0usize;
1421        for segment in segments.iter().skip(segment_index) {
1422            if segment.start_offset != covered_offset {
1423                break;
1424            }
1425            let segment_cover_end = chunk.end_offset.min(segment.end_offset);
1426            let segment_flush_len = match usize::try_from(segment_cover_end - segment.start_offset)
1427            {
1428                Ok(segment_flush_len) => segment_flush_len,
1429                Err(_) => {
1430                    return StreamResponse::error_with_next_offset(
1431                        StreamErrorCode::InvalidColdFlush,
1432                        "cold chunk length does not fit in memory",
1433                        stream.tail_offset,
1434                    );
1435                }
1436            };
1437            let Some(expected_payload_start) = drain_start.checked_add(flush_len) else {
1438                return StreamResponse::error_with_next_offset(
1439                    StreamErrorCode::InvalidColdFlush,
1440                    "cold chunk length does not fit in memory",
1441                    stream.tail_offset,
1442                );
1443            };
1444            if segment.payload_start != expected_payload_start {
1445                return StreamResponse::error_with_next_offset(
1446                    StreamErrorCode::InvalidColdFlush,
1447                    format!("stream '{stream_id}' has non-contiguous hot payload metadata"),
1448                    stream.tail_offset,
1449                );
1450            }
1451            let segment_payload_len = segment.payload_end - segment.payload_start;
1452            if segment_flush_len > segment_payload_len {
1453                return StreamResponse::error_with_next_offset(
1454                    StreamErrorCode::InvalidColdFlush,
1455                    format!("cold chunk length exceeds stream '{stream_id}' hot segment metadata"),
1456                    stream.tail_offset,
1457                );
1458            }
1459            let Some(new_flush_len) = flush_len.checked_add(segment_flush_len) else {
1460                return StreamResponse::error_with_next_offset(
1461                    StreamErrorCode::InvalidColdFlush,
1462                    "cold chunk length does not fit in memory",
1463                    stream.tail_offset,
1464                );
1465            };
1466            flush_len = new_flush_len;
1467            covered_offset = segment_cover_end;
1468            if covered_offset == chunk.end_offset {
1469                break;
1470            }
1471        }
1472        if covered_offset != chunk.end_offset {
1473            return StreamResponse::error_with_next_offset(
1474                StreamErrorCode::InvalidColdFlush,
1475                format!(
1476                    "cold chunk for stream '{stream_id}' does not cover contiguous hot payload segments"
1477                ),
1478                stream.tail_offset,
1479            );
1480        }
1481        let Some(drain_end) = drain_start.checked_add(flush_len) else {
1482            return StreamResponse::error_with_next_offset(
1483                StreamErrorCode::InvalidColdFlush,
1484                "cold chunk length does not fit in memory",
1485                stream.tail_offset,
1486            );
1487        };
1488        let payload_len = self
1489            .payloads
1490            .get(&stream_id)
1491            .expect("payload vector exists for stream metadata")
1492            .len();
1493        if drain_end > payload_len {
1494            return StreamResponse::error_with_next_offset(
1495                StreamErrorCode::InvalidColdFlush,
1496                format!("cold chunk length exceeds stream '{stream_id}' hot payload length"),
1497                stream.tail_offset,
1498            );
1499        };
1500
1501        self.payloads
1502            .get_mut(&stream_id)
1503            .expect("payload vector exists for stream metadata")
1504            .drain(drain_start..drain_end);
1505        self.remove_drained_hot_range(
1506            &stream_id,
1507            segment_index,
1508            chunk.end_offset,
1509            drain_start,
1510            flush_len,
1511        );
1512        self.cold_chunks
1513            .entry(stream_id.clone())
1514            .or_default()
1515            .push(chunk.clone());
1516        self.refresh_hot_start_offset(&stream_id);
1517        StreamResponse::ColdFlushed {
1518            hot_start_offset: self.hot_start_offset(&stream_id),
1519        }
1520    }
1521
1522    fn create_bucket(&mut self, bucket_id: String) -> StreamResponse {
1523        if let Err(message) = validate_bucket_id(&bucket_id) {
1524            return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1525        }
1526        if !self.buckets.insert(bucket_id.clone()) {
1527            return StreamResponse::BucketAlreadyExists { bucket_id };
1528        }
1529        StreamResponse::BucketCreated { bucket_id }
1530    }
1531
1532    fn delete_bucket(&mut self, bucket_id: &str) -> StreamResponse {
1533        if let Err(message) = validate_bucket_id(bucket_id) {
1534            return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1535        }
1536        if !self.buckets.contains(bucket_id) {
1537            return StreamResponse::error(
1538                StreamErrorCode::BucketNotFound,
1539                format!("bucket '{bucket_id}' does not exist"),
1540            );
1541        }
1542        if self
1543            .streams
1544            .keys()
1545            .any(|stream_id| stream_id.bucket_id == bucket_id)
1546        {
1547            return StreamResponse::error(
1548                StreamErrorCode::BucketNotEmpty,
1549                format!("bucket '{bucket_id}' is not empty"),
1550            );
1551        }
1552        self.buckets.remove(bucket_id);
1553        StreamResponse::BucketDeleted {
1554            bucket_id: bucket_id.to_owned(),
1555        }
1556    }
1557
1558    fn create_stream(&mut self, input: CreateStreamInput) -> StreamResponse {
1559        if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1560            return response;
1561        }
1562        if let Err(response) =
1563            validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1564        {
1565            return response;
1566        }
1567        if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1568            return response;
1569        }
1570        if let Some(producer) = input.producer.as_ref()
1571            && producer.producer_seq != 0
1572        {
1573            return StreamResponse::error(
1574                StreamErrorCode::ProducerSeqConflict,
1575                format!(
1576                    "producer '{}' expected sequence 0, received {}",
1577                    producer.producer_id, producer.producer_seq
1578                ),
1579            );
1580        }
1581        if self
1582            .streams
1583            .get(&input.stream_id)
1584            .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1585        {
1586            self.remove_stream_state(&input.stream_id);
1587        }
1588
1589        if let Some(existing) = self.streams.get(&input.stream_id) {
1590            if is_soft_deleted(existing) {
1591                return StreamResponse::error(
1592                    StreamErrorCode::StreamAlreadyExistsConflict,
1593                    format!(
1594                        "stream '{}' is gone and cannot be recreated yet",
1595                        input.stream_id
1596                    ),
1597                );
1598            }
1599            if existing.content_type == input.content_type
1600                && existing.status == status_from_closed(input.close_after)
1601                && existing.stream_ttl_seconds == input.stream_ttl_seconds
1602                && existing.stream_expires_at_ms == input.stream_expires_at_ms
1603                && existing.forked_from == input.forked_from
1604                && existing.fork_offset == input.fork_offset
1605            {
1606                return StreamResponse::AlreadyExists {
1607                    next_offset: existing.tail_offset,
1608                    closed: existing.status == StreamStatus::Closed,
1609                    content_type: existing.content_type.clone(),
1610                    stream_ttl_seconds: existing.stream_ttl_seconds,
1611                    stream_expires_at_ms: existing.stream_expires_at_ms,
1612                };
1613            }
1614            return StreamResponse::error(
1615                StreamErrorCode::StreamAlreadyExistsConflict,
1616                format!(
1617                    "stream '{}' already exists with different metadata",
1618                    input.stream_id
1619                ),
1620            );
1621        }
1622
1623        let initial_len = input.initial_len();
1624        let metadata = StreamMetadata {
1625            stream_id: input.stream_id.clone(),
1626            content_type: input.content_type,
1627            status: status_from_closed(input.close_after),
1628            tail_offset: initial_len,
1629            last_stream_seq: input.stream_seq,
1630            stream_ttl_seconds: input.stream_ttl_seconds,
1631            stream_expires_at_ms: input.stream_expires_at_ms,
1632            created_at_ms: input.now_ms,
1633            last_ttl_touch_at_ms: input.now_ms,
1634            forked_from: input.forked_from,
1635            fork_offset: input.fork_offset,
1636            fork_ref_count: 0,
1637        };
1638        self.streams.insert(input.stream_id.clone(), metadata);
1639        self.payloads
1640            .insert(input.stream_id.clone(), input.initial_payload);
1641        if initial_len > 0 {
1642            self.hot_segments.insert(
1643                input.stream_id.clone(),
1644                vec![HotPayloadSegment {
1645                    start_offset: 0,
1646                    end_offset: initial_len,
1647                    payload_start: 0,
1648                    payload_end: usize::try_from(initial_len).expect("payload len fits usize"),
1649                }],
1650            );
1651            self.message_records.insert(
1652                input.stream_id.clone(),
1653                vec![StreamMessageRecord {
1654                    start_offset: 0,
1655                    end_offset: initial_len,
1656                }],
1657            );
1658        }
1659        let mut producer_states = HashMap::new();
1660        if let Some(producer) = input.producer {
1661            let last_item = ProducerAppendRecord {
1662                start_offset: 0,
1663                next_offset: initial_len,
1664                closed: input.close_after,
1665            };
1666            producer_states.insert(
1667                producer.producer_id,
1668                ProducerState {
1669                    producer_epoch: producer.producer_epoch,
1670                    producer_seq: producer.producer_seq,
1671                    last_start_offset: last_item.start_offset,
1672                    last_next_offset: last_item.next_offset,
1673                    last_closed: last_item.closed,
1674                    last_items: vec![last_item],
1675                },
1676            );
1677        }
1678        self.producers
1679            .insert(input.stream_id.clone(), producer_states);
1680        StreamResponse::Created {
1681            stream_id: input.stream_id,
1682            next_offset: initial_len,
1683            closed: input.close_after,
1684        }
1685    }
1686
1687    fn create_external_stream(&mut self, input: CreateExternalStreamInput) -> StreamResponse {
1688        if let Err(response) = validate_external_payload_ref(&input.initial_payload) {
1689            return response;
1690        }
1691        if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1692            return response;
1693        }
1694        if let Err(response) =
1695            validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1696        {
1697            return response;
1698        }
1699        if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1700            return response;
1701        }
1702        if let Some(producer) = input.producer.as_ref()
1703            && producer.producer_seq != 0
1704        {
1705            return StreamResponse::error(
1706                StreamErrorCode::ProducerSeqConflict,
1707                format!(
1708                    "producer '{}' expected sequence 0, received {}",
1709                    producer.producer_id, producer.producer_seq
1710                ),
1711            );
1712        }
1713        if self
1714            .streams
1715            .get(&input.stream_id)
1716            .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1717        {
1718            self.remove_stream_state(&input.stream_id);
1719        }
1720
1721        if let Some(existing) = self.streams.get(&input.stream_id) {
1722            if is_soft_deleted(existing) {
1723                return StreamResponse::error(
1724                    StreamErrorCode::StreamAlreadyExistsConflict,
1725                    format!(
1726                        "stream '{}' is gone and cannot be recreated yet",
1727                        input.stream_id
1728                    ),
1729                );
1730            }
1731            if existing.content_type == input.content_type
1732                && existing.status == status_from_closed(input.close_after)
1733                && existing.stream_ttl_seconds == input.stream_ttl_seconds
1734                && existing.stream_expires_at_ms == input.stream_expires_at_ms
1735                && existing.forked_from == input.forked_from
1736                && existing.fork_offset == input.fork_offset
1737            {
1738                return StreamResponse::AlreadyExists {
1739                    next_offset: existing.tail_offset,
1740                    closed: existing.status == StreamStatus::Closed,
1741                    content_type: existing.content_type.clone(),
1742                    stream_ttl_seconds: existing.stream_ttl_seconds,
1743                    stream_expires_at_ms: existing.stream_expires_at_ms,
1744                };
1745            }
1746            return StreamResponse::error(
1747                StreamErrorCode::StreamAlreadyExistsConflict,
1748                format!(
1749                    "stream '{}' already exists with different metadata",
1750                    input.stream_id
1751                ),
1752            );
1753        }
1754
1755        let initial_len = input.initial_payload.payload_len;
1756        let metadata = StreamMetadata {
1757            stream_id: input.stream_id.clone(),
1758            content_type: input.content_type,
1759            status: status_from_closed(input.close_after),
1760            tail_offset: initial_len,
1761            last_stream_seq: input.stream_seq,
1762            stream_ttl_seconds: input.stream_ttl_seconds,
1763            stream_expires_at_ms: input.stream_expires_at_ms,
1764            created_at_ms: input.now_ms,
1765            last_ttl_touch_at_ms: input.now_ms,
1766            forked_from: input.forked_from,
1767            fork_offset: input.fork_offset,
1768            fork_ref_count: 0,
1769        };
1770        self.streams.insert(input.stream_id.clone(), metadata);
1771        self.payloads.insert(input.stream_id.clone(), Vec::new());
1772        self.external_segments.insert(
1773            input.stream_id.clone(),
1774            vec![ObjectPayloadRef {
1775                start_offset: 0,
1776                end_offset: initial_len,
1777                s3_path: input.initial_payload.s3_path,
1778                object_size: input.initial_payload.object_size,
1779            }],
1780        );
1781        self.message_records.insert(
1782            input.stream_id.clone(),
1783            vec![StreamMessageRecord {
1784                start_offset: 0,
1785                end_offset: initial_len,
1786            }],
1787        );
1788        let mut producer_states = HashMap::new();
1789        if let Some(producer) = input.producer {
1790            let last_item = ProducerAppendRecord {
1791                start_offset: 0,
1792                next_offset: initial_len,
1793                closed: input.close_after,
1794            };
1795            producer_states.insert(
1796                producer.producer_id,
1797                ProducerState {
1798                    producer_epoch: producer.producer_epoch,
1799                    producer_seq: producer.producer_seq,
1800                    last_start_offset: last_item.start_offset,
1801                    last_next_offset: last_item.next_offset,
1802                    last_closed: last_item.closed,
1803                    last_items: vec![last_item],
1804                },
1805            );
1806        }
1807        self.producers
1808            .insert(input.stream_id.clone(), producer_states);
1809        StreamResponse::Created {
1810            stream_id: input.stream_id,
1811            next_offset: initial_len,
1812            closed: input.close_after,
1813        }
1814    }
1815
1816    pub fn append_borrowed(&mut self, input: AppendStreamInput<'_>) -> StreamResponse {
1817        let AppendStreamInput {
1818            stream_id,
1819            content_type,
1820            payload,
1821            close_after,
1822            stream_seq,
1823            producer,
1824            now_ms,
1825        } = input;
1826        if let Err(response) = self.validate_stream_scope(&stream_id) {
1827            return response;
1828        }
1829        if let Err(response) = validate_producer_request(producer.as_ref()) {
1830            return response;
1831        }
1832
1833        let Some(_) = self.streams.get(&stream_id) else {
1834            return StreamResponse::error(
1835                StreamErrorCode::StreamNotFound,
1836                format!("stream '{stream_id}' does not exist"),
1837            );
1838        };
1839        if self.expire_stream_if_due(&stream_id, now_ms) {
1840            return StreamResponse::error(
1841                StreamErrorCode::StreamNotFound,
1842                format!("stream '{stream_id}' does not exist"),
1843            );
1844        }
1845        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1846            return StreamResponse::error(
1847                StreamErrorCode::StreamGone,
1848                format!("stream '{stream_id}' is gone"),
1849            );
1850        }
1851        let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1852            Ok(decision) => decision,
1853            Err(response) => return response,
1854        };
1855        if let ProducerDecision::Duplicate {
1856            offset,
1857            next_offset,
1858            closed,
1859            producer,
1860            ..
1861        } = producer_decision
1862        {
1863            if payload.is_empty() {
1864                return StreamResponse::Closed {
1865                    next_offset,
1866                    deduplicated: true,
1867                    producer: Some(producer),
1868                };
1869            }
1870            return StreamResponse::Appended {
1871                offset,
1872                next_offset,
1873                closed,
1874                deduplicated: true,
1875                producer: Some(producer),
1876            };
1877        }
1878
1879        let Some(stream) = self.streams.get_mut(&stream_id) else {
1880            unreachable!("stream existence checked before producer evaluation");
1881        };
1882
1883        if stream.status == StreamStatus::Closed {
1884            if close_after && payload.is_empty() {
1885                return StreamResponse::Closed {
1886                    next_offset: stream.tail_offset,
1887                    deduplicated: false,
1888                    producer: None,
1889                };
1890            }
1891            return StreamResponse::error_with_next_offset(
1892                StreamErrorCode::StreamClosed,
1893                format!("stream '{stream_id}' is closed"),
1894                stream.tail_offset,
1895            );
1896        }
1897
1898        if payload.is_empty() && !close_after {
1899            return StreamResponse::error(
1900                StreamErrorCode::EmptyAppend,
1901                "append payload must be non-empty unless closing the stream",
1902            );
1903        }
1904
1905        if !payload.is_empty() {
1906            let Some(content_type) = content_type else {
1907                return StreamResponse::error(
1908                    StreamErrorCode::MissingContentType,
1909                    "append with a body must include content type",
1910                );
1911            };
1912            if content_type != stream.content_type {
1913                return StreamResponse::error_with_next_offset(
1914                    StreamErrorCode::ContentTypeMismatch,
1915                    format!(
1916                        "append content type '{content_type}' does not match stream content type '{}'",
1917                        stream.content_type
1918                    ),
1919                    stream.tail_offset,
1920                );
1921            }
1922        }
1923
1924        if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1925            return response;
1926        }
1927
1928        let offset = stream.tail_offset;
1929        let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1930        stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1931        if let Some(seq) = stream_seq {
1932            stream.last_stream_seq = Some(seq);
1933        }
1934        renew_stream_ttl(stream, now_ms);
1935        if close_after {
1936            stream.status = StreamStatus::Closed;
1937        }
1938        let closed = stream.status == StreamStatus::Closed;
1939        let next_offset = stream.tail_offset;
1940        let producer_ack = producer.clone();
1941        if let Some(producer) = producer {
1942            self.record_producer_success(
1943                stream_id.clone(),
1944                producer,
1945                ProducerAppendRecord {
1946                    start_offset: offset,
1947                    next_offset,
1948                    closed,
1949                },
1950                vec![ProducerAppendRecord {
1951                    start_offset: offset,
1952                    next_offset,
1953                    closed,
1954                }],
1955            );
1956        }
1957
1958        if payload.is_empty() {
1959            StreamResponse::Closed {
1960                next_offset,
1961                deduplicated: false,
1962                producer: producer_ack,
1963            }
1964        } else {
1965            let payload_store = self
1966                .payloads
1967                .get_mut(&stream_id)
1968                .expect("payload vector exists for stream metadata");
1969            let payload_start = payload_store.len();
1970            payload_store.extend_from_slice(payload);
1971            let payload_end = payload_store.len();
1972            self.hot_segments
1973                .get_mut(&stream_id)
1974                .map(|segments| {
1975                    segments.push(HotPayloadSegment {
1976                        start_offset: offset,
1977                        end_offset: next_offset,
1978                        payload_start,
1979                        payload_end,
1980                    })
1981                })
1982                .unwrap_or_else(|| {
1983                    self.hot_segments.insert(
1984                        stream_id.clone(),
1985                        vec![HotPayloadSegment {
1986                            start_offset: offset,
1987                            end_offset: next_offset,
1988                            payload_start,
1989                            payload_end,
1990                        }],
1991                    );
1992                });
1993            self.refresh_hot_start_offset(&stream_id);
1994            self.message_records
1995                .entry(stream_id.clone())
1996                .or_default()
1997                .push(StreamMessageRecord {
1998                    start_offset: offset,
1999                    end_offset: next_offset,
2000                });
2001            StreamResponse::Appended {
2002                offset,
2003                next_offset,
2004                closed: close_after,
2005                deduplicated: false,
2006                producer: producer_ack,
2007            }
2008        }
2009    }
2010
2011    fn append_external(&mut self, input: AppendExternalInput<'_>) -> StreamResponse {
2012        let AppendExternalInput {
2013            stream_id,
2014            content_type,
2015            payload,
2016            close_after,
2017            stream_seq,
2018            producer,
2019            now_ms,
2020        } = input;
2021        if let Err(response) = validate_external_payload_ref(&payload) {
2022            return response;
2023        }
2024        if let Err(response) = self.validate_stream_scope(&stream_id) {
2025            return response;
2026        }
2027        if let Err(response) = validate_producer_request(producer.as_ref()) {
2028            return response;
2029        }
2030        let Some(_) = self.streams.get(&stream_id) else {
2031            return StreamResponse::error(
2032                StreamErrorCode::StreamNotFound,
2033                format!("stream '{stream_id}' does not exist"),
2034            );
2035        };
2036        if self.expire_stream_if_due(&stream_id, now_ms) {
2037            return StreamResponse::error(
2038                StreamErrorCode::StreamNotFound,
2039                format!("stream '{stream_id}' does not exist"),
2040            );
2041        }
2042        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
2043            return StreamResponse::error(
2044                StreamErrorCode::StreamGone,
2045                format!("stream '{stream_id}' is gone"),
2046            );
2047        }
2048        let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
2049            Ok(decision) => decision,
2050            Err(response) => return response,
2051        };
2052        if let ProducerDecision::Duplicate {
2053            offset,
2054            next_offset,
2055            closed,
2056            producer,
2057            ..
2058        } = producer_decision
2059        {
2060            return StreamResponse::Appended {
2061                offset,
2062                next_offset,
2063                closed,
2064                deduplicated: true,
2065                producer: Some(producer),
2066            };
2067        }
2068
2069        let Some(stream) = self.streams.get(&stream_id) else {
2070            unreachable!("stream existence checked before producer evaluation");
2071        };
2072        if stream.status == StreamStatus::Closed {
2073            return StreamResponse::error_with_next_offset(
2074                StreamErrorCode::StreamClosed,
2075                format!("stream '{stream_id}' is closed"),
2076                stream.tail_offset,
2077            );
2078        }
2079        let Some(content_type) = content_type else {
2080            return StreamResponse::error(
2081                StreamErrorCode::MissingContentType,
2082                "append with a body must include content type",
2083            );
2084        };
2085        if content_type != stream.content_type {
2086            return StreamResponse::error_with_next_offset(
2087                StreamErrorCode::ContentTypeMismatch,
2088                format!(
2089                    "append content type '{content_type}' does not match stream content type '{}'",
2090                    stream.content_type
2091                ),
2092                stream.tail_offset,
2093            );
2094        }
2095        if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
2096            return response;
2097        }
2098        let offset = stream.tail_offset;
2099        let next_offset = offset.saturating_add(payload.payload_len);
2100        let stream = self
2101            .streams
2102            .get_mut(&stream_id)
2103            .expect("stream existence checked before external append mutation");
2104        stream.tail_offset = next_offset;
2105        if let Some(seq) = stream_seq {
2106            stream.last_stream_seq = Some(seq);
2107        }
2108        renew_stream_ttl(stream, now_ms);
2109        if close_after {
2110            stream.status = StreamStatus::Closed;
2111        }
2112        let closed = stream.status == StreamStatus::Closed;
2113        let producer_ack = producer.clone();
2114        if let Some(producer) = producer {
2115            self.record_producer_success(
2116                stream_id.clone(),
2117                producer,
2118                ProducerAppendRecord {
2119                    start_offset: offset,
2120                    next_offset,
2121                    closed,
2122                },
2123                vec![ProducerAppendRecord {
2124                    start_offset: offset,
2125                    next_offset,
2126                    closed,
2127                }],
2128            );
2129        }
2130        self.external_segments
2131            .entry(stream_id.clone())
2132            .or_default()
2133            .push(ObjectPayloadRef {
2134                start_offset: offset,
2135                end_offset: next_offset,
2136                s3_path: payload.s3_path,
2137                object_size: payload.object_size,
2138            });
2139        self.message_records
2140            .entry(stream_id.clone())
2141            .or_default()
2142            .push(StreamMessageRecord {
2143                start_offset: offset,
2144                end_offset: next_offset,
2145            });
2146        StreamResponse::Appended {
2147            offset,
2148            next_offset,
2149            closed: close_after,
2150            deduplicated: false,
2151            producer: producer_ack,
2152        }
2153    }
2154
2155    pub fn append_batch_borrowed(
2156        &mut self,
2157        stream_id: BucketStreamId,
2158        content_type: Option<&str>,
2159        payloads: &[&[u8]],
2160        producer: Option<ProducerRequest>,
2161        now_ms: u64,
2162    ) -> Result<StreamBatchAppend, StreamResponse> {
2163        if payloads.is_empty() {
2164            return Err(StreamResponse::error(
2165                StreamErrorCode::EmptyAppend,
2166                "append batch must contain at least one payload",
2167            ));
2168        }
2169        self.validate_stream_scope(&stream_id)?;
2170        validate_producer_request(producer.as_ref())?;
2171        if self.expire_stream_if_due(&stream_id, now_ms) {
2172            return Err(StreamResponse::error(
2173                StreamErrorCode::StreamNotFound,
2174                format!("stream '{stream_id}' does not exist"),
2175            ));
2176        }
2177        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
2178            return Err(StreamResponse::error(
2179                StreamErrorCode::StreamGone,
2180                format!("stream '{stream_id}' is gone"),
2181            ));
2182        }
2183        let producer_decision = self.evaluate_producer(&stream_id, producer.as_ref())?;
2184        if let ProducerDecision::Duplicate { items, .. } = producer_decision {
2185            return Ok(StreamBatchAppend {
2186                items: items
2187                    .into_iter()
2188                    .map(|item| StreamBatchAppendItem {
2189                        offset: item.start_offset,
2190                        next_offset: item.next_offset,
2191                        closed: item.closed,
2192                        deduplicated: true,
2193                    })
2194                    .collect(),
2195                deduplicated: true,
2196            });
2197        }
2198
2199        let Some(stream) = self.streams.get_mut(&stream_id) else {
2200            return Err(StreamResponse::error(
2201                StreamErrorCode::StreamNotFound,
2202                format!("stream '{stream_id}' does not exist"),
2203            ));
2204        };
2205        if stream.status == StreamStatus::Closed {
2206            return Err(StreamResponse::error_with_next_offset(
2207                StreamErrorCode::StreamClosed,
2208                format!("stream '{stream_id}' is closed"),
2209                stream.tail_offset,
2210            ));
2211        }
2212        let Some(content_type) = content_type else {
2213            return Err(StreamResponse::error(
2214                StreamErrorCode::MissingContentType,
2215                "append batch must include content type",
2216            ));
2217        };
2218        if content_type != stream.content_type {
2219            return Err(StreamResponse::error_with_next_offset(
2220                StreamErrorCode::ContentTypeMismatch,
2221                format!(
2222                    "append content type '{content_type}' does not match stream content type '{}'",
2223                    stream.content_type
2224                ),
2225                stream.tail_offset,
2226            ));
2227        }
2228        if payloads.iter().any(|payload| payload.is_empty()) {
2229            return Err(StreamResponse::error(
2230                StreamErrorCode::EmptyAppend,
2231                "append batch payloads must be non-empty",
2232            ));
2233        }
2234
2235        let mut items = Vec::with_capacity(payloads.len());
2236        for payload in payloads {
2237            let offset = stream.tail_offset;
2238            let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
2239            stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
2240            items.push(ProducerAppendRecord {
2241                start_offset: offset,
2242                next_offset: stream.tail_offset,
2243                closed: false,
2244            });
2245        }
2246        let last = items
2247            .last()
2248            .expect("payloads checked non-empty before append")
2249            .clone();
2250        renew_stream_ttl(stream, now_ms);
2251        if let Some(producer) = producer {
2252            self.record_producer_success(stream_id.clone(), producer, last.clone(), items.clone());
2253        }
2254        let payload_store = self
2255            .payloads
2256            .get_mut(&stream_id)
2257            .expect("payload vector exists for stream metadata");
2258        let hot_segments = self.hot_segments.entry(stream_id.clone()).or_default();
2259        for (item, payload) in items.iter().zip(payloads.iter()) {
2260            let payload_start = payload_store.len();
2261            payload_store.extend_from_slice(payload);
2262            let payload_end = payload_store.len();
2263            hot_segments.push(HotPayloadSegment {
2264                start_offset: item.start_offset,
2265                end_offset: item.next_offset,
2266                payload_start,
2267                payload_end,
2268            });
2269        }
2270        self.refresh_hot_start_offset(&stream_id);
2271        self.message_records
2272            .entry(stream_id.clone())
2273            .or_default()
2274            .extend(items.iter().map(|item| StreamMessageRecord {
2275                start_offset: item.start_offset,
2276                end_offset: item.next_offset,
2277            }));
2278        Ok(StreamBatchAppend {
2279            items: items
2280                .into_iter()
2281                .map(|item| StreamBatchAppendItem {
2282                    offset: item.start_offset,
2283                    next_offset: item.next_offset,
2284                    closed: item.closed,
2285                    deduplicated: false,
2286                })
2287                .collect(),
2288            deduplicated: false,
2289        })
2290    }
2291
2292    fn close(
2293        &mut self,
2294        stream_id: BucketStreamId,
2295        stream_seq: Option<String>,
2296        producer: Option<ProducerRequest>,
2297        now_ms: u64,
2298    ) -> StreamResponse {
2299        self.append_borrowed(AppendStreamInput {
2300            stream_id,
2301            content_type: None,
2302            payload: &[],
2303            close_after: true,
2304            stream_seq,
2305            producer,
2306            now_ms,
2307        })
2308    }
2309
2310    fn delete_stream(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
2311        if let Err(response) = self.validate_stream_scope(stream_id) {
2312            return response;
2313        }
2314        let Some(stream) = self.streams.get_mut(stream_id) else {
2315            return StreamResponse::error(
2316                StreamErrorCode::StreamNotFound,
2317                format!("stream '{stream_id}' does not exist"),
2318            );
2319        };
2320        if is_soft_deleted(stream) {
2321            return StreamResponse::error(
2322                StreamErrorCode::StreamGone,
2323                format!("stream '{stream_id}' is gone"),
2324            );
2325        }
2326        if stream.fork_ref_count > 0 {
2327            stream.status = StreamStatus::SoftDeleted;
2328            return StreamResponse::Deleted {
2329                hard_deleted: false,
2330                parent_to_release: None,
2331            };
2332        }
2333        let parent_to_release = stream.forked_from.clone();
2334        self.remove_stream_state(stream_id);
2335        StreamResponse::Deleted {
2336            hard_deleted: true,
2337            parent_to_release,
2338        }
2339    }
2340
2341    fn add_fork_ref(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> StreamResponse {
2342        if let Err(response) = self.validate_stream_scope(stream_id) {
2343            return response;
2344        }
2345        if self.expire_stream_if_due(stream_id, now_ms) {
2346            return StreamResponse::error(
2347                StreamErrorCode::StreamNotFound,
2348                format!("stream '{stream_id}' does not exist"),
2349            );
2350        }
2351        let Some(stream) = self.streams.get_mut(stream_id) else {
2352            return StreamResponse::error(
2353                StreamErrorCode::StreamNotFound,
2354                format!("stream '{stream_id}' does not exist"),
2355            );
2356        };
2357        if is_soft_deleted(stream) {
2358            return StreamResponse::error(
2359                StreamErrorCode::StreamGone,
2360                format!("stream '{stream_id}' is gone"),
2361            );
2362        }
2363        stream.fork_ref_count = stream.fork_ref_count.saturating_add(1);
2364        StreamResponse::ForkRefAdded {
2365            fork_ref_count: stream.fork_ref_count,
2366        }
2367    }
2368
2369    fn release_fork_ref(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
2370        if let Err(response) = self.validate_stream_scope(stream_id) {
2371            return response;
2372        }
2373        let Some(stream) = self.streams.get_mut(stream_id) else {
2374            return StreamResponse::ForkRefReleased {
2375                hard_deleted: false,
2376                fork_ref_count: 0,
2377                parent_to_release: None,
2378            };
2379        };
2380        if stream.fork_ref_count == 0 {
2381            return StreamResponse::error(
2382                StreamErrorCode::InvalidFork,
2383                format!("stream '{stream_id}' has no fork reference to release"),
2384            );
2385        }
2386        stream.fork_ref_count -= 1;
2387        if stream.fork_ref_count == 0 && is_soft_deleted(stream) {
2388            let parent_to_release = stream.forked_from.clone();
2389            self.remove_stream_state(stream_id);
2390            return StreamResponse::ForkRefReleased {
2391                hard_deleted: true,
2392                fork_ref_count: 0,
2393                parent_to_release,
2394            };
2395        }
2396        StreamResponse::ForkRefReleased {
2397            hard_deleted: false,
2398            fork_ref_count: stream.fork_ref_count,
2399            parent_to_release: None,
2400        }
2401    }
2402
2403    fn touch_stream_access(
2404        &mut self,
2405        stream_id: &BucketStreamId,
2406        now_ms: u64,
2407        renew_ttl: bool,
2408    ) -> StreamResponse {
2409        if let Err(response) = self.validate_stream_scope(stream_id) {
2410            return response;
2411        }
2412        let Some(stream) = self.streams.get(stream_id) else {
2413            return StreamResponse::error(
2414                StreamErrorCode::StreamNotFound,
2415                format!("stream '{stream_id}' does not exist"),
2416            );
2417        };
2418        if is_soft_deleted(stream) {
2419            return StreamResponse::error(
2420                StreamErrorCode::StreamGone,
2421                format!("stream '{stream_id}' is gone"),
2422            );
2423        }
2424        if stream_is_expired(stream, now_ms) {
2425            self.remove_stream_state(stream_id);
2426            return StreamResponse::Accessed {
2427                changed: true,
2428                expired: true,
2429            };
2430        }
2431        let changed = if renew_ttl && stream.stream_ttl_seconds.is_some() {
2432            let stream = self
2433                .streams
2434                .get_mut(stream_id)
2435                .expect("stream existence checked before TTL renewal");
2436            let previous = stream.last_ttl_touch_at_ms;
2437            renew_stream_ttl(stream, now_ms);
2438            stream.last_ttl_touch_at_ms != previous
2439        } else {
2440            false
2441        };
2442        StreamResponse::Accessed {
2443            changed,
2444            expired: false,
2445        }
2446    }
2447
2448    fn expire_stream_if_due(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> bool {
2449        if self
2450            .streams
2451            .get(stream_id)
2452            .is_some_and(|stream| stream_is_expired(stream, now_ms))
2453        {
2454            self.remove_stream_state(stream_id);
2455            return true;
2456        }
2457        false
2458    }
2459
2460    fn remove_stream_state(&mut self, stream_id: &BucketStreamId) -> bool {
2461        if self.streams.remove(stream_id).is_some() {
2462            self.payloads.remove(stream_id);
2463            self.hot_segments.remove(stream_id);
2464            self.hot_start_offsets.remove(stream_id);
2465            self.cold_chunks.remove(stream_id);
2466            self.external_segments.remove(stream_id);
2467            self.message_records.remove(stream_id);
2468            self.visible_snapshots.remove(stream_id);
2469            self.producers.remove(stream_id);
2470            true
2471        } else {
2472            false
2473        }
2474    }
2475
2476    fn validate_stream_scope(&self, stream_id: &BucketStreamId) -> Result<(), StreamResponse> {
2477        if let Err(message) = validate_bucket_id(&stream_id.bucket_id) {
2478            return Err(StreamResponse::error(
2479                StreamErrorCode::InvalidBucketId,
2480                message,
2481            ));
2482        }
2483        if let Err(message) = validate_stream_id(stream_id) {
2484            return Err(StreamResponse::error(
2485                StreamErrorCode::InvalidStreamId,
2486                message,
2487            ));
2488        }
2489        if !self.buckets.contains(&stream_id.bucket_id) {
2490            return Err(StreamResponse::error(
2491                StreamErrorCode::BucketNotFound,
2492                format!("bucket '{}' does not exist", stream_id.bucket_id),
2493            ));
2494        }
2495        Ok(())
2496    }
2497
2498    fn earliest_retained_offset(&self, stream_id: &BucketStreamId) -> u64 {
2499        self.visible_snapshots
2500            .get(stream_id)
2501            .map(|snapshot| snapshot.offset)
2502            .unwrap_or(0)
2503    }
2504
2505    fn snapshot_offset_aligned(
2506        &self,
2507        stream_id: &BucketStreamId,
2508        snapshot_offset: u64,
2509        retained_offset: u64,
2510    ) -> bool {
2511        snapshot_offset == retained_offset
2512            || self.message_records.get(stream_id).is_some_and(|records| {
2513                records
2514                    .iter()
2515                    .any(|record| record.end_offset == snapshot_offset)
2516            })
2517    }
2518
2519    fn compact_retained_prefix(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2520        if let Some(records) = self.message_records.get_mut(stream_id) {
2521            records.retain(|record| record.end_offset > retained_offset);
2522            if records.is_empty() {
2523                self.message_records.remove(stream_id);
2524            }
2525        }
2526        if let Some(chunks) = self.cold_chunks.get_mut(stream_id) {
2527            chunks.retain(|chunk| chunk.end_offset > retained_offset);
2528            if chunks.is_empty() {
2529                self.cold_chunks.remove(stream_id);
2530            }
2531        }
2532        if let Some(objects) = self.external_segments.get_mut(stream_id) {
2533            objects.retain(|object| object.end_offset > retained_offset);
2534            if objects.is_empty() {
2535                self.external_segments.remove(stream_id);
2536            }
2537        }
2538
2539        self.discard_hot_prefix_before(stream_id, retained_offset);
2540    }
2541
2542    fn refresh_hot_start_offset(&mut self, stream_id: &BucketStreamId) {
2543        let Some(hot_start_offset) = self
2544            .hot_segments
2545            .get(stream_id)
2546            .and_then(|segments| segments.iter().map(|segment| segment.start_offset).min())
2547        else {
2548            self.hot_start_offsets.remove(stream_id);
2549            return;
2550        };
2551        if hot_start_offset == 0 {
2552            self.hot_start_offsets.remove(stream_id);
2553        } else {
2554            self.hot_start_offsets
2555                .insert(stream_id.clone(), hot_start_offset);
2556        }
2557    }
2558
2559    fn remove_drained_hot_range(
2560        &mut self,
2561        stream_id: &BucketStreamId,
2562        segment_index: usize,
2563        new_start_offset: u64,
2564        drain_start: usize,
2565        drained_len: usize,
2566    ) {
2567        let Some(segments) = self.hot_segments.get_mut(stream_id) else {
2568            self.hot_start_offsets.remove(stream_id);
2569            return;
2570        };
2571        if segment_index >= segments.len() {
2572            self.refresh_hot_start_offset(stream_id);
2573            return;
2574        }
2575        let drain_end = drain_start + drained_len;
2576        let mut updated_segments = Vec::with_capacity(segments.len());
2577        for (index, mut segment) in segments.drain(..).enumerate() {
2578            if index < segment_index || segment.payload_end <= drain_start {
2579                updated_segments.push(segment);
2580                continue;
2581            }
2582            if segment.payload_start >= drain_end {
2583                segment.payload_start -= drained_len;
2584                segment.payload_end -= drained_len;
2585                updated_segments.push(segment);
2586                continue;
2587            }
2588            if segment.payload_end <= drain_end {
2589                continue;
2590            }
2591            segment.start_offset = new_start_offset;
2592            segment.payload_start = drain_start;
2593            segment.payload_end -= drained_len;
2594            updated_segments.push(segment);
2595        }
2596        *segments = updated_segments;
2597        if segments.is_empty() {
2598            self.hot_segments.remove(stream_id);
2599        }
2600        self.refresh_hot_start_offset(stream_id);
2601    }
2602
2603    fn discard_hot_prefix_before(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2604        while let Some(segment_index) = self
2605            .hot_segments(stream_id)
2606            .iter()
2607            .position(|segment| segment.start_offset < retained_offset)
2608        {
2609            let segment = self.hot_segments(stream_id)[segment_index].clone();
2610            let new_start_offset = retained_offset.min(segment.end_offset);
2611            let drained_len = usize::try_from(new_start_offset - segment.start_offset)
2612                .expect("drain len fits usize");
2613            if drained_len == 0 {
2614                break;
2615            }
2616            let drain_start = segment.payload_start;
2617            let drain_end = drain_start + drained_len;
2618            self.payloads
2619                .get_mut(stream_id)
2620                .expect("payload vector exists for stream metadata")
2621                .drain(drain_start..drain_end);
2622            self.remove_drained_hot_range(
2623                stream_id,
2624                segment_index,
2625                new_start_offset,
2626                drain_start,
2627                drained_len,
2628            );
2629        }
2630        self.refresh_hot_start_offset(stream_id);
2631    }
2632
2633    fn producer_snapshot(&self, stream_id: &BucketStreamId) -> Vec<ProducerSnapshot> {
2634        let mut producer_states = self
2635            .producers
2636            .get(stream_id)
2637            .into_iter()
2638            .flat_map(|states| states.iter())
2639            .map(|(producer_id, state)| ProducerSnapshot {
2640                producer_id: producer_id.clone(),
2641                producer_epoch: state.producer_epoch,
2642                producer_seq: state.producer_seq,
2643                last_start_offset: state.last_start_offset,
2644                last_next_offset: state.last_next_offset,
2645                last_closed: state.last_closed,
2646                last_items: state.last_items.clone(),
2647            })
2648            .collect::<Vec<_>>();
2649        producer_states.sort_by(|left, right| left.producer_id.cmp(&right.producer_id));
2650        producer_states
2651    }
2652
2653    fn evaluate_producer(
2654        &self,
2655        stream_id: &BucketStreamId,
2656        producer: Option<&ProducerRequest>,
2657    ) -> Result<ProducerDecision, StreamResponse> {
2658        let Some(producer) = producer else {
2659            return Ok(ProducerDecision::Accept);
2660        };
2661        let Some(states) = self.producers.get(stream_id) else {
2662            return Ok(ProducerDecision::Accept);
2663        };
2664        let Some(state) = states.get(&producer.producer_id) else {
2665            if producer.producer_seq == 0 {
2666                return Ok(ProducerDecision::Accept);
2667            }
2668            return Err(StreamResponse::error(
2669                StreamErrorCode::ProducerSeqConflict,
2670                format!(
2671                    "producer '{}' expected sequence 0, received {}",
2672                    producer.producer_id, producer.producer_seq
2673                ),
2674            ));
2675        };
2676
2677        if producer.producer_epoch < state.producer_epoch {
2678            return Err(StreamResponse::error(
2679                StreamErrorCode::ProducerEpochStale,
2680                format!(
2681                    "producer '{}' epoch {} is stale; current epoch is {}",
2682                    producer.producer_id, producer.producer_epoch, state.producer_epoch
2683                ),
2684            ));
2685        }
2686        if producer.producer_epoch > state.producer_epoch {
2687            if producer.producer_seq == 0 {
2688                return Ok(ProducerDecision::Accept);
2689            }
2690            return Err(StreamResponse::error(
2691                StreamErrorCode::InvalidProducer,
2692                format!(
2693                    "producer '{}' new epoch {} must start at sequence 0",
2694                    producer.producer_id, producer.producer_epoch
2695                ),
2696            ));
2697        }
2698
2699        if producer.producer_seq <= state.producer_seq {
2700            return Ok(ProducerDecision::Duplicate {
2701                offset: state.last_start_offset,
2702                next_offset: state.last_next_offset,
2703                closed: state.last_closed,
2704                producer: ProducerRequest {
2705                    producer_id: producer.producer_id.clone(),
2706                    producer_epoch: state.producer_epoch,
2707                    producer_seq: state.producer_seq,
2708                },
2709                items: state.last_items.clone(),
2710            });
2711        }
2712        if producer.producer_seq == state.producer_seq + 1 {
2713            return Ok(ProducerDecision::Accept);
2714        }
2715        Err(StreamResponse::error(
2716            StreamErrorCode::ProducerSeqConflict,
2717            format!(
2718                "producer '{}' expected sequence {}, received {}",
2719                producer.producer_id,
2720                state.producer_seq + 1,
2721                producer.producer_seq
2722            ),
2723        ))
2724    }
2725
2726    fn record_producer_success(
2727        &mut self,
2728        stream_id: BucketStreamId,
2729        producer: ProducerRequest,
2730        last: ProducerAppendRecord,
2731        last_items: Vec<ProducerAppendRecord>,
2732    ) {
2733        self.producers.entry(stream_id).or_default().insert(
2734            producer.producer_id,
2735            ProducerState {
2736                producer_epoch: producer.producer_epoch,
2737                producer_seq: producer.producer_seq,
2738                last_start_offset: last.start_offset,
2739                last_next_offset: last.next_offset,
2740                last_closed: last.closed,
2741                last_items,
2742            },
2743        );
2744    }
2745}
2746
2747#[derive(Debug)]
2748struct CreateStreamInput {
2749    stream_id: BucketStreamId,
2750    content_type: String,
2751    initial_payload: Vec<u8>,
2752    close_after: bool,
2753    stream_seq: Option<String>,
2754    producer: Option<ProducerRequest>,
2755    stream_ttl_seconds: Option<u64>,
2756    stream_expires_at_ms: Option<u64>,
2757    forked_from: Option<BucketStreamId>,
2758    fork_offset: Option<u64>,
2759    now_ms: u64,
2760}
2761
2762#[derive(Debug)]
2763struct CreateExternalStreamInput {
2764    stream_id: BucketStreamId,
2765    content_type: String,
2766    initial_payload: ExternalPayloadRef,
2767    close_after: bool,
2768    stream_seq: Option<String>,
2769    producer: Option<ProducerRequest>,
2770    stream_ttl_seconds: Option<u64>,
2771    stream_expires_at_ms: Option<u64>,
2772    forked_from: Option<BucketStreamId>,
2773    fork_offset: Option<u64>,
2774    now_ms: u64,
2775}
2776
2777#[derive(Debug, Clone, PartialEq, Eq)]
2778enum ProducerDecision {
2779    Accept,
2780    Duplicate {
2781        offset: u64,
2782        next_offset: u64,
2783        closed: bool,
2784        producer: ProducerRequest,
2785        items: Vec<ProducerAppendRecord>,
2786    },
2787}
2788
2789impl CreateStreamInput {
2790    fn initial_len(&self) -> u64 {
2791        u64::try_from(self.initial_payload.len()).expect("payload len fits u64")
2792    }
2793}
2794
2795fn status_from_closed(closed: bool) -> StreamStatus {
2796    if closed {
2797        StreamStatus::Closed
2798    } else {
2799        StreamStatus::Open
2800    }
2801}
2802
2803fn is_soft_deleted(stream: &StreamMetadata) -> bool {
2804    stream.status == StreamStatus::SoftDeleted
2805}
2806
2807fn validate_retention(
2808    stream_ttl_seconds: Option<u64>,
2809    stream_expires_at_ms: Option<u64>,
2810) -> Result<(), StreamResponse> {
2811    if stream_ttl_seconds.is_some() && stream_expires_at_ms.is_some() {
2812        return Err(StreamResponse::error(
2813            StreamErrorCode::InvalidRetention,
2814            "stream ttl and expires-at cannot both be set",
2815        ));
2816    }
2817    if let Some(ttl_seconds) = stream_ttl_seconds
2818        && ttl_seconds.checked_mul(1000).is_none()
2819    {
2820        return Err(StreamResponse::error(
2821            StreamErrorCode::InvalidRetention,
2822            "stream ttl overflows millisecond range",
2823        ));
2824    }
2825    Ok(())
2826}
2827
2828fn stream_expiry_at_ms(stream: &StreamMetadata) -> Option<u64> {
2829    if let Some(expires_at_ms) = stream.stream_expires_at_ms {
2830        return Some(expires_at_ms);
2831    }
2832    stream.stream_ttl_seconds.map(|ttl_seconds| {
2833        stream
2834            .last_ttl_touch_at_ms
2835            .saturating_add(ttl_seconds.saturating_mul(1000))
2836    })
2837}
2838
2839fn stream_is_expired(stream: &StreamMetadata, now_ms: u64) -> bool {
2840    stream_expiry_at_ms(stream).is_some_and(|expires_at_ms| now_ms >= expires_at_ms)
2841}
2842
2843fn renew_stream_ttl(stream: &mut StreamMetadata, now_ms: u64) {
2844    if stream.stream_ttl_seconds.is_some() && stream.stream_expires_at_ms.is_none() {
2845        stream.last_ttl_touch_at_ms = now_ms;
2846    }
2847}
2848
2849fn check_stream_seq(stream: &StreamMetadata, incoming: Option<&str>) -> Result<(), StreamResponse> {
2850    let Some(incoming) = incoming else {
2851        return Ok(());
2852    };
2853    if let Some(last) = stream.last_stream_seq.as_deref()
2854        && incoming <= last
2855    {
2856        return Err(StreamResponse::error_with_next_offset(
2857            StreamErrorCode::StreamSeqConflict,
2858            format!("stream sequence '{incoming}' is not greater than last sequence '{last}'"),
2859            stream.tail_offset,
2860        ));
2861    }
2862    Ok(())
2863}
2864
2865fn validate_producer_request(producer: Option<&ProducerRequest>) -> Result<(), StreamResponse> {
2866    let Some(producer) = producer else {
2867        return Ok(());
2868    };
2869    if producer.producer_id.trim().is_empty() {
2870        return Err(StreamResponse::error(
2871            StreamErrorCode::InvalidProducer,
2872            "producer id must not be empty",
2873        ));
2874    }
2875    const MAX_JS_SAFE_INTEGER: u64 = 9_007_199_254_740_991;
2876    if producer.producer_epoch > MAX_JS_SAFE_INTEGER {
2877        return Err(StreamResponse::error(
2878            StreamErrorCode::InvalidProducer,
2879            format!(
2880                "producer epoch {} exceeds maximum {}",
2881                producer.producer_epoch, MAX_JS_SAFE_INTEGER
2882            ),
2883        ));
2884    }
2885    if producer.producer_seq > MAX_JS_SAFE_INTEGER {
2886        return Err(StreamResponse::error(
2887            StreamErrorCode::InvalidProducer,
2888            format!(
2889                "producer sequence {} exceeds maximum {}",
2890                producer.producer_seq, MAX_JS_SAFE_INTEGER
2891            ),
2892        ));
2893    }
2894    Ok(())
2895}
2896
2897fn validate_external_payload_ref(payload: &ExternalPayloadRef) -> Result<(), StreamResponse> {
2898    if payload.s3_path.trim().is_empty() {
2899        return Err(StreamResponse::error(
2900            StreamErrorCode::InvalidColdFlush,
2901            "external payload S3 path must not be empty",
2902        ));
2903    }
2904    if payload.payload_len == 0 {
2905        return Err(StreamResponse::error(
2906            StreamErrorCode::EmptyAppend,
2907            "external payload length must be greater than zero",
2908        ));
2909    }
2910    if payload.object_size < payload.payload_len {
2911        return Err(StreamResponse::error(
2912            StreamErrorCode::InvalidColdFlush,
2913            "external payload object size must cover payload length",
2914        ));
2915    }
2916    Ok(())
2917}
2918
2919fn restore_producer_states(
2920    stream_id: &BucketStreamId,
2921    snapshots: Vec<ProducerSnapshot>,
2922) -> Result<HashMap<String, ProducerState>, StreamSnapshotError> {
2923    let mut states = HashMap::with_capacity(snapshots.len());
2924    for snapshot in snapshots {
2925        if states
2926            .insert(
2927                snapshot.producer_id.clone(),
2928                ProducerState {
2929                    producer_epoch: snapshot.producer_epoch,
2930                    producer_seq: snapshot.producer_seq,
2931                    last_start_offset: snapshot.last_start_offset,
2932                    last_next_offset: snapshot.last_next_offset,
2933                    last_closed: snapshot.last_closed,
2934                    last_items: snapshot.last_items,
2935                },
2936            )
2937            .is_some()
2938        {
2939            return Err(StreamSnapshotError::DuplicateProducer {
2940                stream_id: stream_id.clone(),
2941                producer_id: snapshot.producer_id,
2942            });
2943        }
2944    }
2945    Ok(states)
2946}
2947
2948fn valid_cold_chunk_ref(chunk: &ColdChunkRef) -> bool {
2949    chunk.end_offset > chunk.start_offset
2950        && !chunk.s3_path.trim().is_empty()
2951        && chunk.object_size >= chunk.end_offset - chunk.start_offset
2952}
2953
2954fn valid_object_payload_ref(object: &ObjectPayloadRef) -> bool {
2955    object.end_offset > object.start_offset
2956        && !object.s3_path.trim().is_empty()
2957        && object.object_size >= object.end_offset - object.start_offset
2958}
2959
2960fn hot_segments_match_payload(segments: &[HotPayloadSegment], payload_len: usize) -> bool {
2961    let mut expected_payload_start = 0;
2962    for segment in segments {
2963        if segment.end_offset <= segment.start_offset
2964            || segment.payload_start != expected_payload_start
2965            || segment.payload_end <= segment.payload_start
2966            || segment.payload_end > payload_len
2967        {
2968            return false;
2969        }
2970        let Ok(logical_len) = usize::try_from(segment.end_offset - segment.start_offset) else {
2971            return false;
2972        };
2973        if logical_len != segment.payload_end - segment.payload_start {
2974            return false;
2975        }
2976        expected_payload_start = segment.payload_end;
2977    }
2978    expected_payload_start == payload_len
2979}
2980
2981fn payload_sources_cover_retained_suffix(
2982    cold_chunks: &[ColdChunkRef],
2983    external_segments: &[ObjectPayloadRef],
2984    hot_segments: &[HotPayloadSegment],
2985    retained_offset: u64,
2986    tail_offset: u64,
2987) -> bool {
2988    if tail_offset < retained_offset {
2989        return false;
2990    }
2991    let mut ranges =
2992        Vec::with_capacity(cold_chunks.len() + external_segments.len() + hot_segments.len());
2993    for chunk in cold_chunks {
2994        if !valid_cold_chunk_ref(chunk) {
2995            return false;
2996        }
2997        ranges.push((chunk.start_offset, chunk.end_offset));
2998    }
2999    for object in external_segments {
3000        if !valid_object_payload_ref(object) {
3001            return false;
3002        }
3003        ranges.push((object.start_offset, object.end_offset));
3004    }
3005    for segment in hot_segments {
3006        if segment.end_offset <= segment.start_offset {
3007            return false;
3008        }
3009        ranges.push((segment.start_offset, segment.end_offset));
3010    }
3011    ranges.sort_unstable();
3012
3013    let mut expected_start = retained_offset;
3014    for (start_offset, end_offset) in ranges {
3015        if end_offset <= expected_start {
3016            continue;
3017        }
3018        if start_offset > expected_start {
3019            return false;
3020        }
3021        expected_start = end_offset;
3022        if expected_start >= tail_offset {
3023            return true;
3024        }
3025    }
3026    expected_start == tail_offset
3027}
3028
3029fn segments_cover_range(
3030    segments: &[(u64, StreamReadSegment)],
3031    offset: u64,
3032    next_offset: u64,
3033) -> bool {
3034    if next_offset < offset {
3035        return false;
3036    }
3037    let mut expected_start = offset;
3038    for (segment_start, segment) in segments {
3039        let Some(segment_end) = read_segment_end(*segment_start, segment) else {
3040            return false;
3041        };
3042        if segment_end <= expected_start {
3043            continue;
3044        }
3045        if *segment_start > expected_start {
3046            return false;
3047        }
3048        expected_start = segment_end;
3049        if expected_start >= next_offset {
3050            return true;
3051        }
3052    }
3053    expected_start == next_offset
3054}
3055
3056fn read_segment_end(segment_start: u64, segment: &StreamReadSegment) -> Option<u64> {
3057    match segment {
3058        StreamReadSegment::Object(object) => {
3059            if object.len == 0
3060                || object.read_start_offset != segment_start
3061                || object.read_start_offset < object.object.start_offset
3062            {
3063                return None;
3064            }
3065            let len = u64::try_from(object.len).ok()?;
3066            let segment_end = object.read_start_offset.checked_add(len)?;
3067            if segment_end > object.object.end_offset {
3068                return None;
3069            }
3070            Some(segment_end)
3071        }
3072        StreamReadSegment::Hot(payload) => {
3073            if payload.is_empty() {
3074                return None;
3075            }
3076            let len = u64::try_from(payload.len()).ok()?;
3077            segment_start.checked_add(len)
3078        }
3079    }
3080}
3081
3082fn message_records_cover_retained_suffix(
3083    records: &[StreamMessageRecord],
3084    retained_offset: u64,
3085    tail_offset: u64,
3086) -> bool {
3087    let mut expected_start = retained_offset;
3088    for record in records {
3089        if record.start_offset != expected_start || record.end_offset <= record.start_offset {
3090            return false;
3091        }
3092        expected_start = record.end_offset;
3093    }
3094    expected_start == tail_offset
3095}
3096
3097fn compare_stream_ids(left: &BucketStreamId, right: &BucketStreamId) -> std::cmp::Ordering {
3098    left.bucket_id
3099        .cmp(&right.bucket_id)
3100        .then_with(|| left.stream_id.cmp(&right.stream_id))
3101}
3102
3103pub fn validate_bucket_id(bucket_id: &str) -> Result<(), String> {
3104    if !(4..=64).contains(&bucket_id.len()) {
3105        return Err(format!(
3106            "bucket_id must be 4 to 64 bytes, got {} bytes",
3107            bucket_id.len()
3108        ));
3109    }
3110    if !bucket_id.bytes().all(|byte| {
3111        byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'_' || byte == b'-'
3112    }) {
3113        return Err("bucket_id must match ^[a-z0-9_-]{4,64}$".to_owned());
3114    }
3115    Ok(())
3116}
3117
3118pub fn validate_stream_id(stream_id: &BucketStreamId) -> Result<(), String> {
3119    let local = stream_id.stream_id.as_str();
3120    if local.is_empty() {
3121        return Err("stream_id must not be empty".to_owned());
3122    }
3123    if local.len() > 122 {
3124        return Err(format!(
3125            "stream_id must not exceed 122 bytes, got {} bytes",
3126            local.len()
3127        ));
3128    }
3129    if local == "streams" {
3130        return Err("stream_id 'streams' is reserved".to_owned());
3131    }
3132    if local.contains('/') || local.contains('\0') || local.contains("..") {
3133        return Err("stream_id must not contain '/', NUL, or '..'".to_owned());
3134    }
3135    let combined_len = stream_id.bucket_id.len() + 1 + local.len();
3136    if combined_len > 122 {
3137        return Err(format!(
3138            "bucket_id/stream_id must not exceed 122 bytes, got {combined_len} bytes"
3139        ));
3140    }
3141    Ok(())
3142}
3143
3144#[cfg(test)]
3145mod tests {
3146    use super::*;
3147
3148    fn stream(id: &str) -> BucketStreamId {
3149        BucketStreamId::new("benchcmp", id)
3150    }
3151
3152    fn create_bucket(machine: &mut StreamStateMachine) {
3153        assert_eq!(
3154            machine.apply(StreamCommand::CreateBucket {
3155                bucket_id: "benchcmp".to_owned(),
3156            }),
3157            StreamResponse::BucketCreated {
3158                bucket_id: "benchcmp".to_owned(),
3159            }
3160        );
3161    }
3162
3163    fn create_stream(machine: &mut StreamStateMachine, id: &str) {
3164        assert_eq!(
3165            machine.apply(StreamCommand::CreateStream {
3166                stream_id: stream(id),
3167                content_type: "application/octet-stream".to_owned(),
3168                initial_payload: Vec::new(),
3169                close_after: false,
3170                stream_seq: None,
3171                producer: None,
3172                stream_ttl_seconds: None,
3173                stream_expires_at_ms: None,
3174                forked_from: None,
3175                fork_offset: None,
3176                now_ms: 0,
3177            }),
3178            StreamResponse::Created {
3179                stream_id: stream(id),
3180                next_offset: 0,
3181                closed: false,
3182            }
3183        );
3184    }
3185
3186    fn producer(id: &str, epoch: u64, seq: u64) -> ProducerRequest {
3187        ProducerRequest {
3188            producer_id: id.to_owned(),
3189            producer_epoch: epoch,
3190            producer_seq: seq,
3191        }
3192    }
3193
3194    #[test]
3195    fn stream_create_requires_existing_bucket_and_valid_ids() {
3196        let mut machine = StreamStateMachine::new();
3197
3198        assert!(matches!(
3199            machine.apply(StreamCommand::CreateBucket {
3200                bucket_id: "Bad".to_owned(),
3201            }),
3202            StreamResponse::Error {
3203                code: StreamErrorCode::InvalidBucketId,
3204                ..
3205            }
3206        ));
3207        assert!(matches!(
3208            machine.apply(StreamCommand::CreateStream {
3209                stream_id: stream("s-1"),
3210                content_type: "application/octet-stream".to_owned(),
3211                initial_payload: Vec::new(),
3212                close_after: false,
3213                stream_seq: None,
3214                producer: None,
3215                stream_ttl_seconds: None,
3216                stream_expires_at_ms: None,
3217                forked_from: None,
3218                fork_offset: None,
3219                now_ms: 0,
3220            }),
3221            StreamResponse::Error {
3222                code: StreamErrorCode::BucketNotFound,
3223                ..
3224            }
3225        ));
3226
3227        create_bucket(&mut machine);
3228        assert!(matches!(
3229            machine.apply(StreamCommand::CreateStream {
3230                stream_id: stream("streams"),
3231                content_type: "application/octet-stream".to_owned(),
3232                initial_payload: Vec::new(),
3233                close_after: false,
3234                stream_seq: None,
3235                producer: None,
3236                stream_ttl_seconds: None,
3237                stream_expires_at_ms: None,
3238                forked_from: None,
3239                fork_offset: None,
3240                now_ms: 0,
3241            }),
3242            StreamResponse::Error {
3243                code: StreamErrorCode::InvalidStreamId,
3244                ..
3245            }
3246        ));
3247    }
3248
3249    #[test]
3250    fn create_stream_is_idempotent_only_when_metadata_matches() {
3251        let mut machine = StreamStateMachine::new();
3252        create_bucket(&mut machine);
3253        create_stream(&mut machine, "s-1");
3254
3255        assert_eq!(
3256            machine.apply(StreamCommand::CreateStream {
3257                stream_id: stream("s-1"),
3258                content_type: "application/octet-stream".to_owned(),
3259                initial_payload: vec![0; 99],
3260                close_after: false,
3261                stream_seq: None,
3262                producer: None,
3263                stream_ttl_seconds: None,
3264                stream_expires_at_ms: None,
3265                forked_from: None,
3266                fork_offset: None,
3267                now_ms: 0,
3268            }),
3269            StreamResponse::AlreadyExists {
3270                next_offset: 0,
3271                closed: false,
3272                content_type: "application/octet-stream".to_owned(),
3273                stream_ttl_seconds: None,
3274                stream_expires_at_ms: None,
3275            }
3276        );
3277
3278        assert!(matches!(
3279            machine.apply(StreamCommand::CreateStream {
3280                stream_id: stream("s-1"),
3281                content_type: "text/plain".to_owned(),
3282                initial_payload: Vec::new(),
3283                close_after: false,
3284                stream_seq: None,
3285                producer: None,
3286                stream_ttl_seconds: None,
3287                stream_expires_at_ms: None,
3288                forked_from: None,
3289                fork_offset: None,
3290                now_ms: 0,
3291            }),
3292            StreamResponse::Error {
3293                code: StreamErrorCode::StreamAlreadyExistsConflict,
3294                ..
3295            }
3296        ));
3297    }
3298
3299    #[test]
3300    fn append_advances_offsets_and_checks_content_type() {
3301        let mut machine = StreamStateMachine::new();
3302        create_bucket(&mut machine);
3303        create_stream(&mut machine, "s-1");
3304
3305        assert_eq!(
3306            machine.apply(StreamCommand::Append {
3307                stream_id: stream("s-1"),
3308                content_type: Some("application/octet-stream".to_owned()),
3309                payload: b"abcdefg".to_vec(),
3310                close_after: false,
3311                stream_seq: None,
3312                producer: None,
3313                now_ms: 0,
3314            }),
3315            StreamResponse::Appended {
3316                offset: 0,
3317                next_offset: 7,
3318                closed: false,
3319                deduplicated: false,
3320                producer: None,
3321            }
3322        );
3323        assert!(matches!(
3324            machine.apply(StreamCommand::Append {
3325                stream_id: stream("s-1"),
3326                content_type: Some("text/plain".to_owned()),
3327                payload: b"x".to_vec(),
3328                close_after: false,
3329                stream_seq: None,
3330                producer: None,
3331                now_ms: 0,
3332            }),
3333            StreamResponse::Error {
3334                code: StreamErrorCode::ContentTypeMismatch,
3335                next_offset: Some(7),
3336                ..
3337            }
3338        ));
3339        assert_eq!(machine.head(&stream("s-1")).expect("stream").tail_offset, 7);
3340    }
3341
3342    #[test]
3343    fn catch_up_read_returns_payload_slice_and_bounds_errors() {
3344        let mut machine = StreamStateMachine::new();
3345        create_bucket(&mut machine);
3346        create_stream(&mut machine, "s-1");
3347        assert!(matches!(
3348            machine.apply(StreamCommand::Append {
3349                stream_id: stream("s-1"),
3350                content_type: Some("application/octet-stream".to_owned()),
3351                payload: b"abcdefg".to_vec(),
3352                close_after: false,
3353                stream_seq: None,
3354                producer: None,
3355                now_ms: 0,
3356            }),
3357            StreamResponse::Appended { .. }
3358        ));
3359
3360        assert_eq!(
3361            machine.read(&stream("s-1"), 2, 3).expect("read"),
3362            StreamRead {
3363                offset: 2,
3364                next_offset: 5,
3365                content_type: "application/octet-stream".to_owned(),
3366                payload: b"cde".to_vec(),
3367                up_to_date: false,
3368                closed: false,
3369            }
3370        );
3371        assert_eq!(
3372            machine.read(&stream("s-1"), 7, 16).expect("tail read"),
3373            StreamRead {
3374                offset: 7,
3375                next_offset: 7,
3376                content_type: "application/octet-stream".to_owned(),
3377                payload: Vec::new(),
3378                up_to_date: true,
3379                closed: false,
3380            }
3381        );
3382        assert!(matches!(
3383            machine.read(&stream("s-1"), 8, 1),
3384            Err(StreamResponse::Error {
3385                code: StreamErrorCode::OffsetOutOfRange,
3386                next_offset: Some(7),
3387                ..
3388            })
3389        ));
3390    }
3391
3392    #[test]
3393    fn flush_cold_moves_hot_prefix_to_manifest_and_read_plan_splits() {
3394        let mut machine = StreamStateMachine::new();
3395        create_bucket(&mut machine);
3396        create_stream(&mut machine, "cold");
3397        assert!(matches!(
3398            machine.apply(StreamCommand::Append {
3399                stream_id: stream("cold"),
3400                content_type: Some("application/octet-stream".to_owned()),
3401                payload: b"abcdef".to_vec(),
3402                close_after: false,
3403                stream_seq: None,
3404                producer: None,
3405                now_ms: 0,
3406            }),
3407            StreamResponse::Appended {
3408                offset: 0,
3409                next_offset: 6,
3410                ..
3411            }
3412        ));
3413
3414        let candidate = machine
3415            .plan_cold_flush(&stream("cold"), 4, 4)
3416            .expect("plan cold flush")
3417            .expect("cold flush candidate");
3418        assert_eq!(candidate.start_offset, 0);
3419        assert_eq!(candidate.end_offset, 4);
3420        assert_eq!(candidate.payload, b"abcd");
3421        assert_eq!(
3422            machine.apply(StreamCommand::FlushCold {
3423                stream_id: stream("cold"),
3424                chunk: ColdChunkRef {
3425                    start_offset: candidate.start_offset,
3426                    end_offset: candidate.end_offset,
3427                    s3_path: "s3://bucket/cold/000000".to_owned(),
3428                    object_size: u64::try_from(candidate.payload.len()).unwrap(),
3429                },
3430            }),
3431            StreamResponse::ColdFlushed {
3432                hot_start_offset: 4,
3433            }
3434        );
3435        assert_eq!(machine.hot_start_offset(&stream("cold")), 4);
3436        assert_eq!(machine.cold_chunks(&stream("cold")).len(), 1);
3437
3438        let plan = machine.read_plan(&stream("cold"), 2, 4).expect("read plan");
3439        assert_eq!(plan.next_offset, 6);
3440        assert_eq!(plan.segments.len(), 2);
3441        match &plan.segments[0] {
3442            StreamReadSegment::Object(segment) => {
3443                assert_eq!(segment.read_start_offset, 2);
3444                assert_eq!(segment.len, 2);
3445            }
3446            other => panic!("expected cold object segment, got {other:?}"),
3447        }
3448        match &plan.segments[1] {
3449            StreamReadSegment::Hot(payload) => assert_eq!(payload, b"ef"),
3450            other => panic!("expected hot segment, got {other:?}"),
3451        }
3452        assert_eq!(
3453            machine.read(&stream("cold"), 0, 6),
3454            Err(StreamResponse::Error {
3455                code: StreamErrorCode::InvalidColdFlush,
3456                message: "stream 'benchcmp/cold' read requires object payload store".to_owned(),
3457                next_offset: Some(6),
3458            })
3459        );
3460        assert_eq!(
3461            machine.read(&stream("cold"), 4, 8).expect("hot read"),
3462            StreamRead {
3463                offset: 4,
3464                next_offset: 6,
3465                content_type: "application/octet-stream".to_owned(),
3466                payload: b"ef".to_vec(),
3467                up_to_date: true,
3468                closed: false,
3469            }
3470        );
3471
3472        let restored = StreamStateMachine::restore(machine.snapshot()).expect("restore snapshot");
3473        assert_eq!(restored.hot_start_offset(&stream("cold")), 4);
3474        assert_eq!(restored.cold_chunks(&stream("cold")).len(), 1);
3475        assert_eq!(
3476            restored.read(&stream("cold"), 4, 8).expect("hot read"),
3477            StreamRead {
3478                offset: 4,
3479                next_offset: 6,
3480                content_type: "application/octet-stream".to_owned(),
3481                payload: b"ef".to_vec(),
3482                up_to_date: true,
3483                closed: false,
3484            }
3485        );
3486    }
3487
3488    #[test]
3489    fn flush_cold_can_coalesce_contiguous_hot_segments() {
3490        let mut machine = StreamStateMachine::new();
3491        create_bucket(&mut machine);
3492        create_stream(&mut machine, "cold-coalesced");
3493        assert!(matches!(
3494            machine.apply(StreamCommand::Append {
3495                stream_id: stream("cold-coalesced"),
3496                content_type: Some("application/octet-stream".to_owned()),
3497                payload: b"abc".to_vec(),
3498                close_after: false,
3499                stream_seq: None,
3500                producer: None,
3501                now_ms: 0,
3502            }),
3503            StreamResponse::Appended {
3504                offset: 0,
3505                next_offset: 3,
3506                ..
3507            }
3508        ));
3509        assert!(matches!(
3510            machine.apply(StreamCommand::Append {
3511                stream_id: stream("cold-coalesced"),
3512                content_type: Some("application/octet-stream".to_owned()),
3513                payload: b"de".to_vec(),
3514                close_after: false,
3515                stream_seq: None,
3516                producer: None,
3517                now_ms: 0,
3518            }),
3519            StreamResponse::Appended {
3520                offset: 3,
3521                next_offset: 5,
3522                ..
3523            }
3524        ));
3525
3526        assert_eq!(
3527            machine.apply(StreamCommand::FlushCold {
3528                stream_id: stream("cold-coalesced"),
3529                chunk: ColdChunkRef {
3530                    start_offset: 0,
3531                    end_offset: 5,
3532                    s3_path: "s3://bucket/cold-coalesced/000000".to_owned(),
3533                    object_size: 5,
3534                },
3535            }),
3536            StreamResponse::ColdFlushed {
3537                hot_start_offset: 0,
3538            }
3539        );
3540        assert!(machine.hot_segments(&stream("cold-coalesced")).is_empty());
3541        assert_eq!(machine.hot_payload_len(&stream("cold-coalesced")), Ok(0));
3542        assert_eq!(machine.cold_chunks(&stream("cold-coalesced")).len(), 1);
3543
3544        let plan = machine
3545            .read_plan(&stream("cold-coalesced"), 0, 5)
3546            .expect("read plan");
3547        assert_eq!(plan.next_offset, 5);
3548        assert_eq!(plan.segments.len(), 1);
3549        match &plan.segments[0] {
3550            StreamReadSegment::Object(segment) => {
3551                assert_eq!(segment.read_start_offset, 0);
3552                assert_eq!(segment.len, 5);
3553            }
3554            other => panic!("expected cold object segment, got {other:?}"),
3555        }
3556    }
3557
3558    #[test]
3559    fn plan_cold_flush_coalesces_contiguous_hot_segments() {
3560        let mut machine = StreamStateMachine::new();
3561        create_bucket(&mut machine);
3562        create_stream(&mut machine, "cold-planned-coalesced");
3563        for payload in [b"ab".as_slice(), b"cd".as_slice(), b"ef".as_slice()] {
3564            assert!(matches!(
3565                machine.apply(StreamCommand::Append {
3566                    stream_id: stream("cold-planned-coalesced"),
3567                    content_type: Some("application/octet-stream".to_owned()),
3568                    payload: payload.to_vec(),
3569                    close_after: false,
3570                    stream_seq: None,
3571                    producer: None,
3572                    now_ms: 0,
3573                }),
3574                StreamResponse::Appended { .. }
3575            ));
3576        }
3577
3578        assert!(
3579            machine
3580                .plan_cold_flush(&stream("cold-planned-coalesced"), 4, 4)
3581                .expect("plan cold flush")
3582                .is_some(),
3583            "planner should consider contiguous small hot segments together"
3584        );
3585        let candidate = machine
3586            .plan_cold_flush(&stream("cold-planned-coalesced"), 5, 5)
3587            .expect("plan cold flush")
3588            .expect("candidate");
3589        assert_eq!(candidate.start_offset, 0);
3590        assert_eq!(candidate.end_offset, 5);
3591        assert_eq!(candidate.payload, b"abcde");
3592    }
3593
3594    #[test]
3595    fn plan_next_cold_flush_selects_deterministic_eligible_stream() {
3596        let mut machine = StreamStateMachine::new();
3597        create_bucket(&mut machine);
3598        create_stream(&mut machine, "z-cold");
3599        create_stream(&mut machine, "a-cold");
3600        assert!(matches!(
3601            machine.apply(StreamCommand::Append {
3602                stream_id: stream("z-cold"),
3603                content_type: Some("application/octet-stream".to_owned()),
3604                payload: b"zzzz".to_vec(),
3605                close_after: false,
3606                stream_seq: None,
3607                producer: None,
3608                now_ms: 0,
3609            }),
3610            StreamResponse::Appended { .. }
3611        ));
3612        assert!(matches!(
3613            machine.apply(StreamCommand::Append {
3614                stream_id: stream("a-cold"),
3615                content_type: Some("application/octet-stream".to_owned()),
3616                payload: b"aaaa".to_vec(),
3617                close_after: false,
3618                stream_seq: None,
3619                producer: None,
3620                now_ms: 0,
3621            }),
3622            StreamResponse::Appended { .. }
3623        ));
3624
3625        let candidate = machine
3626            .plan_next_cold_flush(4, 4)
3627            .expect("plan next cold flush")
3628            .expect("candidate");
3629        assert_eq!(candidate.stream_id, stream("a-cold"));
3630        assert_eq!(candidate.payload, b"aaaa");
3631    }
3632
3633    #[test]
3634    fn plan_next_cold_flush_batch_advances_on_preview_state() {
3635        let mut machine = StreamStateMachine::new();
3636        create_bucket(&mut machine);
3637        create_stream(&mut machine, "batched-cold");
3638        assert!(matches!(
3639            machine.apply(StreamCommand::Append {
3640                stream_id: stream("batched-cold"),
3641                content_type: Some("application/octet-stream".to_owned()),
3642                payload: b"abcd".to_vec(),
3643                close_after: false,
3644                stream_seq: None,
3645                producer: None,
3646                now_ms: 0,
3647            }),
3648            StreamResponse::Appended { .. }
3649        ));
3650
3651        let candidates = machine
3652            .plan_next_cold_flush_batch(1, 1, 4)
3653            .expect("plan cold flush batch");
3654        assert_eq!(candidates.len(), 4);
3655        assert_eq!(
3656            candidates
3657                .iter()
3658                .map(|candidate| (candidate.start_offset, candidate.end_offset))
3659                .collect::<Vec<_>>(),
3660            vec![(0, 1), (1, 2), (2, 3), (3, 4)]
3661        );
3662        assert_eq!(
3663            candidates
3664                .iter()
3665                .map(|candidate| candidate.payload.as_slice())
3666                .collect::<Vec<_>>(),
3667            vec![
3668                b"a".as_slice(),
3669                b"b".as_slice(),
3670                b"c".as_slice(),
3671                b"d".as_slice()
3672            ]
3673        );
3674        assert_eq!(machine.hot_payload_len(&stream("batched-cold")), Ok(4));
3675        assert!(machine.cold_chunks(&stream("batched-cold")).is_empty());
3676    }
3677
3678    #[test]
3679    fn stale_cold_flush_candidate_after_delete_recreate_is_invalid_without_mutation() {
3680        let mut machine = StreamStateMachine::new();
3681        create_bucket(&mut machine);
3682        create_stream(&mut machine, "stale-cold");
3683        assert!(matches!(
3684            machine.apply(StreamCommand::Append {
3685                stream_id: stream("stale-cold"),
3686                content_type: Some("application/octet-stream".to_owned()),
3687                payload: b"abcdefghijklmnopqr".to_vec(),
3688                close_after: false,
3689                stream_seq: None,
3690                producer: None,
3691                now_ms: 0,
3692            }),
3693            StreamResponse::Appended {
3694                next_offset: 18,
3695                ..
3696            }
3697        ));
3698        let candidate = machine
3699            .plan_cold_flush(&stream("stale-cold"), 18, 18)
3700            .expect("plan cold flush")
3701            .expect("candidate");
3702
3703        assert!(matches!(
3704            machine.apply(StreamCommand::DeleteStream {
3705                stream_id: stream("stale-cold")
3706            }),
3707            StreamResponse::Deleted {
3708                hard_deleted: true,
3709                ..
3710            }
3711        ));
3712        create_stream(&mut machine, "stale-cold");
3713        assert!(matches!(
3714            machine.apply(StreamCommand::Append {
3715                stream_id: stream("stale-cold"),
3716                content_type: Some("application/octet-stream".to_owned()),
3717                payload: b"abcdefghijklmnopq".to_vec(),
3718                close_after: false,
3719                stream_seq: None,
3720                producer: None,
3721                now_ms: 0,
3722            }),
3723            StreamResponse::Appended {
3724                next_offset: 17,
3725                ..
3726            }
3727        ));
3728
3729        match machine.apply(StreamCommand::FlushCold {
3730            stream_id: stream("stale-cold"),
3731            chunk: ColdChunkRef {
3732                start_offset: candidate.start_offset,
3733                end_offset: candidate.end_offset,
3734                s3_path: "s3://bucket/stale-cold/old-candidate".to_owned(),
3735                object_size: u64::try_from(candidate.payload.len()).unwrap(),
3736            },
3737        }) {
3738            StreamResponse::Error {
3739                code: StreamErrorCode::InvalidColdFlush,
3740                message,
3741                next_offset: Some(17),
3742            } => assert!(message.contains("beyond stream")),
3743            other => panic!("expected stale invalid cold flush, got {other:?}"),
3744        }
3745
3746        assert_eq!(
3747            machine.read(&stream("stale-cold"), 0, 32).expect("read"),
3748            StreamRead {
3749                offset: 0,
3750                next_offset: 17,
3751                content_type: "application/octet-stream".to_owned(),
3752                payload: b"abcdefghijklmnopq".to_vec(),
3753                up_to_date: true,
3754                closed: false,
3755            }
3756        );
3757    }
3758
3759    #[test]
3760    fn plan_next_cold_flush_skips_soft_deleted_streams() {
3761        let mut machine = StreamStateMachine::new();
3762        create_bucket(&mut machine);
3763        create_stream(&mut machine, "a-gone");
3764        create_stream(&mut machine, "b-live");
3765        assert!(matches!(
3766            machine.apply(StreamCommand::Append {
3767                stream_id: stream("a-gone"),
3768                content_type: Some("application/octet-stream".to_owned()),
3769                payload: b"gone".to_vec(),
3770                close_after: false,
3771                stream_seq: None,
3772                producer: None,
3773                now_ms: 0,
3774            }),
3775            StreamResponse::Appended { .. }
3776        ));
3777        assert!(matches!(
3778            machine.apply(StreamCommand::AddForkRef {
3779                stream_id: stream("a-gone"),
3780                now_ms: 0,
3781            }),
3782            StreamResponse::ForkRefAdded { .. }
3783        ));
3784        assert_eq!(
3785            machine.apply(StreamCommand::DeleteStream {
3786                stream_id: stream("a-gone"),
3787            }),
3788            StreamResponse::Deleted {
3789                hard_deleted: false,
3790                parent_to_release: None,
3791            }
3792        );
3793        assert!(matches!(
3794            machine.apply(StreamCommand::Append {
3795                stream_id: stream("b-live"),
3796                content_type: Some("application/octet-stream".to_owned()),
3797                payload: b"live".to_vec(),
3798                close_after: false,
3799                stream_seq: None,
3800                producer: None,
3801                now_ms: 0,
3802            }),
3803            StreamResponse::Appended { .. }
3804        ));
3805
3806        let candidate = machine
3807            .plan_next_cold_flush(4, 4)
3808            .expect("plan next cold flush")
3809            .expect("candidate");
3810        assert_eq!(candidate.stream_id, stream("b-live"));
3811        assert_eq!(candidate.payload, b"live");
3812    }
3813
3814    #[test]
3815    fn hot_payload_byte_metrics_follow_cold_flush() {
3816        let mut machine = StreamStateMachine::new();
3817        create_bucket(&mut machine);
3818        create_stream(&mut machine, "hot-a");
3819        create_stream(&mut machine, "hot-b");
3820        for (stream_name, payload) in [("hot-a", b"abcd".as_slice()), ("hot-b", b"xy".as_slice())] {
3821            assert!(matches!(
3822                machine.apply(StreamCommand::Append {
3823                    stream_id: stream(stream_name),
3824                    content_type: Some("application/octet-stream".to_owned()),
3825                    payload: payload.to_vec(),
3826                    close_after: false,
3827                    stream_seq: None,
3828                    producer: None,
3829                    now_ms: 0,
3830                }),
3831                StreamResponse::Appended { .. }
3832            ));
3833        }
3834
3835        assert_eq!(machine.hot_payload_len(&stream("hot-a")), Ok(4));
3836        assert_eq!(machine.hot_payload_len(&stream("hot-b")), Ok(2));
3837        assert_eq!(machine.total_hot_payload_bytes(), 6);
3838
3839        assert_eq!(
3840            machine.apply(StreamCommand::FlushCold {
3841                stream_id: stream("hot-a"),
3842                chunk: ColdChunkRef {
3843                    start_offset: 0,
3844                    end_offset: 3,
3845                    s3_path: "s3://bucket/hot-a/000000".to_owned(),
3846                    object_size: 3,
3847                },
3848            }),
3849            StreamResponse::ColdFlushed {
3850                hot_start_offset: 3,
3851            }
3852        );
3853        assert_eq!(machine.hot_payload_len(&stream("hot-a")), Ok(1));
3854        assert_eq!(machine.total_hot_payload_bytes(), 3);
3855    }
3856
3857    #[test]
3858    fn snapshot_restore_round_trips_payload_metadata_and_stream_seq() {
3859        let mut machine = StreamStateMachine::new();
3860        create_bucket(&mut machine);
3861        assert_eq!(
3862            machine.apply(StreamCommand::CreateStream {
3863                stream_id: stream("snap-open"),
3864                content_type: "application/octet-stream".to_owned(),
3865                initial_payload: b"hi".to_vec(),
3866                close_after: false,
3867                stream_seq: Some("0001".to_owned()),
3868                producer: None,
3869                stream_ttl_seconds: Some(60),
3870                stream_expires_at_ms: None,
3871                forked_from: None,
3872                fork_offset: None,
3873                now_ms: 0,
3874            }),
3875            StreamResponse::Created {
3876                stream_id: stream("snap-open"),
3877                next_offset: 2,
3878                closed: false,
3879            }
3880        );
3881        assert!(matches!(
3882            machine.apply(StreamCommand::Append {
3883                stream_id: stream("snap-open"),
3884                content_type: Some("application/octet-stream".to_owned()),
3885                payload: b"abc".to_vec(),
3886                close_after: false,
3887                stream_seq: Some("0002".to_owned()),
3888                producer: None,
3889                now_ms: 0,
3890            }),
3891            StreamResponse::Appended {
3892                offset: 2,
3893                next_offset: 5,
3894                ..
3895            }
3896        ));
3897        assert_eq!(
3898            machine.apply(StreamCommand::CreateStream {
3899                stream_id: stream("snap-closed"),
3900                content_type: "application/octet-stream".to_owned(),
3901                initial_payload: b"x".to_vec(),
3902                close_after: true,
3903                stream_seq: None,
3904                producer: None,
3905                stream_ttl_seconds: None,
3906                stream_expires_at_ms: None,
3907                forked_from: None,
3908                fork_offset: None,
3909                now_ms: 0,
3910            }),
3911            StreamResponse::Created {
3912                stream_id: stream("snap-closed"),
3913                next_offset: 1,
3914                closed: true,
3915            }
3916        );
3917
3918        let encoded = serde_json::to_vec(&machine.snapshot()).expect("serialize snapshot");
3919        let decoded =
3920            serde_json::from_slice::<StreamSnapshot>(&encoded).expect("deserialize snapshot");
3921        let mut restored = StreamStateMachine::restore(decoded).expect("restore snapshot");
3922
3923        assert_eq!(
3924            restored.read(&stream("snap-open"), 0, 16).expect("read"),
3925            StreamRead {
3926                offset: 0,
3927                next_offset: 5,
3928                content_type: "application/octet-stream".to_owned(),
3929                payload: b"hiabc".to_vec(),
3930                up_to_date: true,
3931                closed: false,
3932            }
3933        );
3934        let metadata = restored.head(&stream("snap-open")).expect("metadata");
3935        assert_eq!(metadata.last_stream_seq.as_deref(), Some("0002"));
3936        assert_eq!(metadata.stream_ttl_seconds, Some(60));
3937        assert_eq!(metadata.stream_expires_at_ms, None);
3938
3939        assert!(matches!(
3940            restored.apply(StreamCommand::Append {
3941                stream_id: stream("snap-open"),
3942                content_type: Some("application/octet-stream".to_owned()),
3943                payload: b"bad".to_vec(),
3944                close_after: false,
3945                stream_seq: Some("0002".to_owned()),
3946                producer: None,
3947                now_ms: 0,
3948            }),
3949            StreamResponse::Error {
3950                code: StreamErrorCode::StreamSeqConflict,
3951                next_offset: Some(5),
3952                ..
3953            }
3954        ));
3955        assert_eq!(
3956            restored.apply(StreamCommand::Append {
3957                stream_id: stream("snap-open"),
3958                content_type: Some("application/octet-stream".to_owned()),
3959                payload: b"!".to_vec(),
3960                close_after: false,
3961                stream_seq: Some("0003".to_owned()),
3962                producer: None,
3963                now_ms: 0,
3964            }),
3965            StreamResponse::Appended {
3966                offset: 5,
3967                next_offset: 6,
3968                closed: false,
3969                deduplicated: false,
3970                producer: None,
3971            }
3972        );
3973        assert!(matches!(
3974            restored.apply(StreamCommand::Append {
3975                stream_id: stream("snap-closed"),
3976                content_type: Some("application/octet-stream".to_owned()),
3977                payload: b"!".to_vec(),
3978                close_after: false,
3979                stream_seq: None,
3980                producer: None,
3981                now_ms: 0,
3982            }),
3983            StreamResponse::Error {
3984                code: StreamErrorCode::StreamClosed,
3985                next_offset: Some(1),
3986                ..
3987            }
3988        ));
3989    }
3990
3991    #[test]
3992    fn snapshot_order_is_deterministic() {
3993        let mut machine = StreamStateMachine::new();
3994        for bucket_id in ["zzzz", "benchcmp", "aaaa"] {
3995            machine.apply(StreamCommand::CreateBucket {
3996                bucket_id: bucket_id.to_owned(),
3997            });
3998        }
3999        for stream_id in [
4000            BucketStreamId::new("zzzz", "stream-b"),
4001            BucketStreamId::new("benchcmp", "stream-b"),
4002            BucketStreamId::new("benchcmp", "stream-a"),
4003            BucketStreamId::new("aaaa", "stream-z"),
4004        ] {
4005            assert!(matches!(
4006                machine.apply(StreamCommand::CreateStream {
4007                    stream_id,
4008                    content_type: "application/octet-stream".to_owned(),
4009                    initial_payload: Vec::new(),
4010                    close_after: false,
4011                    stream_seq: None,
4012                    producer: None,
4013                    stream_ttl_seconds: None,
4014                    stream_expires_at_ms: None,
4015                    forked_from: None,
4016                    fork_offset: None,
4017                    now_ms: 0,
4018                }),
4019                StreamResponse::Created { .. }
4020            ));
4021        }
4022
4023        let snapshot = machine.snapshot();
4024        assert_eq!(snapshot.buckets, ["aaaa", "benchcmp", "zzzz"]);
4025        assert_eq!(
4026            snapshot
4027                .streams
4028                .iter()
4029                .map(|entry| entry.metadata.stream_id.to_string())
4030                .collect::<Vec<_>>(),
4031            [
4032                "aaaa/stream-z",
4033                "benchcmp/stream-a",
4034                "benchcmp/stream-b",
4035                "zzzz/stream-b",
4036            ]
4037        );
4038    }
4039
4040    #[test]
4041    fn snapshot_restore_rejects_invalid_entries() {
4042        assert_eq!(
4043            StreamStateMachine::restore(StreamSnapshot {
4044                buckets: vec!["benchcmp".to_owned(), "benchcmp".to_owned()],
4045                streams: Vec::new(),
4046            })
4047            .expect_err("duplicate bucket"),
4048            StreamSnapshotError::DuplicateBucket("benchcmp".to_owned())
4049        );
4050
4051        assert!(matches!(
4052            StreamStateMachine::restore(StreamSnapshot {
4053                buckets: vec!["benchcmp".to_owned()],
4054                streams: vec![StreamSnapshotEntry {
4055                    metadata: StreamMetadata {
4056                        stream_id: BucketStreamId::new("missing", "stream"),
4057                        content_type: "application/octet-stream".to_owned(),
4058                        status: StreamStatus::Open,
4059                        tail_offset: 0,
4060                        last_stream_seq: None,
4061                        stream_ttl_seconds: None,
4062                        stream_expires_at_ms: None,
4063                        created_at_ms: 0,
4064                        last_ttl_touch_at_ms: 0,
4065                        forked_from: None,
4066                        fork_offset: None,
4067                        fork_ref_count: 0,
4068                    },
4069                    hot_start_offset: 0,
4070                    payload: Vec::new(),
4071                    hot_segments: Vec::new(),
4072                    cold_chunks: Vec::new(),
4073                    external_segments: Vec::new(),
4074                    message_records: Vec::new(),
4075                    visible_snapshot: None,
4076                    producer_states: Vec::new(),
4077                }],
4078            }),
4079            Err(StreamSnapshotError::MissingBucket(_))
4080        ));
4081
4082        assert!(matches!(
4083            StreamStateMachine::restore(StreamSnapshot {
4084                buckets: vec!["benchcmp".to_owned()],
4085                streams: vec![StreamSnapshotEntry {
4086                    metadata: StreamMetadata {
4087                        stream_id: stream("bad-len"),
4088                        content_type: "application/octet-stream".to_owned(),
4089                        status: StreamStatus::Open,
4090                        tail_offset: 2,
4091                        last_stream_seq: None,
4092                        stream_ttl_seconds: None,
4093                        stream_expires_at_ms: None,
4094                        created_at_ms: 0,
4095                        last_ttl_touch_at_ms: 0,
4096                        forked_from: None,
4097                        fork_offset: None,
4098                        fork_ref_count: 0,
4099                    },
4100                    hot_start_offset: 0,
4101                    payload: b"x".to_vec(),
4102                    hot_segments: Vec::new(),
4103                    cold_chunks: Vec::new(),
4104                    external_segments: Vec::new(),
4105                    message_records: Vec::new(),
4106                    visible_snapshot: None,
4107                    producer_states: Vec::new(),
4108                }],
4109            }),
4110            Err(StreamSnapshotError::PayloadLengthMismatch { .. })
4111        ));
4112
4113        assert!(matches!(
4114            StreamStateMachine::restore(StreamSnapshot {
4115                buckets: vec!["benchcmp".to_owned()],
4116                streams: vec![StreamSnapshotEntry {
4117                    metadata: StreamMetadata {
4118                        stream_id: stream("duplicate-producer"),
4119                        content_type: "application/octet-stream".to_owned(),
4120                        status: StreamStatus::Open,
4121                        tail_offset: 0,
4122                        last_stream_seq: None,
4123                        stream_ttl_seconds: None,
4124                        stream_expires_at_ms: None,
4125                        created_at_ms: 0,
4126                        last_ttl_touch_at_ms: 0,
4127                        forked_from: None,
4128                        fork_offset: None,
4129                        fork_ref_count: 0,
4130                    },
4131                    hot_start_offset: 0,
4132                    payload: Vec::new(),
4133                    hot_segments: Vec::new(),
4134                    cold_chunks: Vec::new(),
4135                    external_segments: Vec::new(),
4136                    message_records: Vec::new(),
4137                    visible_snapshot: None,
4138                    producer_states: vec![
4139                        ProducerSnapshot {
4140                            producer_id: "writer-1".to_owned(),
4141                            producer_epoch: 0,
4142                            producer_seq: 0,
4143                            last_start_offset: 0,
4144                            last_next_offset: 0,
4145                            last_closed: false,
4146                            last_items: Vec::new(),
4147                        },
4148                        ProducerSnapshot {
4149                            producer_id: "writer-1".to_owned(),
4150                            producer_epoch: 1,
4151                            producer_seq: 0,
4152                            last_start_offset: 0,
4153                            last_next_offset: 0,
4154                            last_closed: false,
4155                            last_items: Vec::new(),
4156                        },
4157                    ],
4158                }],
4159            }),
4160            Err(StreamSnapshotError::DuplicateProducer { .. })
4161        ));
4162    }
4163
4164    #[test]
4165    fn close_is_monotonic_and_close_only_is_idempotent() {
4166        let mut machine = StreamStateMachine::new();
4167        create_bucket(&mut machine);
4168        create_stream(&mut machine, "s-1");
4169
4170        assert_eq!(
4171            machine.apply(StreamCommand::Append {
4172                stream_id: stream("s-1"),
4173                content_type: Some("application/octet-stream".to_owned()),
4174                payload: b"abc".to_vec(),
4175                close_after: true,
4176                stream_seq: None,
4177                producer: None,
4178                now_ms: 0,
4179            }),
4180            StreamResponse::Appended {
4181                offset: 0,
4182                next_offset: 3,
4183                closed: true,
4184                deduplicated: false,
4185                producer: None,
4186            }
4187        );
4188        assert_eq!(
4189            machine.apply(StreamCommand::Close {
4190                stream_id: stream("s-1"),
4191                stream_seq: None,
4192                producer: None,
4193                now_ms: 0,
4194            }),
4195            StreamResponse::Closed {
4196                next_offset: 3,
4197                deduplicated: false,
4198                producer: None,
4199            }
4200        );
4201        assert!(matches!(
4202            machine.apply(StreamCommand::Append {
4203                stream_id: stream("s-1"),
4204                content_type: Some("application/octet-stream".to_owned()),
4205                payload: b"x".to_vec(),
4206                close_after: false,
4207                stream_seq: None,
4208                producer: None,
4209                now_ms: 0,
4210            }),
4211            StreamResponse::Error {
4212                code: StreamErrorCode::StreamClosed,
4213                next_offset: Some(3),
4214                ..
4215            }
4216        ));
4217    }
4218
4219    #[test]
4220    fn stream_seq_must_strictly_increase() {
4221        let mut machine = StreamStateMachine::new();
4222        create_bucket(&mut machine);
4223        create_stream(&mut machine, "s-1");
4224
4225        assert!(matches!(
4226            machine.apply(StreamCommand::Append {
4227                stream_id: stream("s-1"),
4228                content_type: Some("application/octet-stream".to_owned()),
4229                payload: b"a".to_vec(),
4230                close_after: false,
4231                stream_seq: Some("0002".to_owned()),
4232                producer: None,
4233                now_ms: 0,
4234            }),
4235            StreamResponse::Appended { .. }
4236        ));
4237        assert!(matches!(
4238            machine.apply(StreamCommand::Append {
4239                stream_id: stream("s-1"),
4240                content_type: Some("application/octet-stream".to_owned()),
4241                payload: b"b".to_vec(),
4242                close_after: false,
4243                stream_seq: Some("0002".to_owned()),
4244                producer: None,
4245                now_ms: 0,
4246            }),
4247            StreamResponse::Error {
4248                code: StreamErrorCode::StreamSeqConflict,
4249                next_offset: Some(1),
4250                ..
4251            }
4252        ));
4253        assert!(matches!(
4254            machine.apply(StreamCommand::Append {
4255                stream_id: stream("s-1"),
4256                content_type: Some("application/octet-stream".to_owned()),
4257                payload: b"c".to_vec(),
4258                close_after: false,
4259                stream_seq: Some("0003".to_owned()),
4260                producer: None,
4261                now_ms: 0,
4262            }),
4263            StreamResponse::Appended {
4264                offset: 1,
4265                next_offset: 2,
4266                ..
4267            }
4268        ));
4269    }
4270
4271    #[test]
4272    fn producer_headers_deduplicate_retries_and_fence_stale_epochs() {
4273        let mut machine = StreamStateMachine::new();
4274        create_bucket(&mut machine);
4275        create_stream(&mut machine, "producer-stream");
4276
4277        assert_eq!(
4278            machine.apply(StreamCommand::Append {
4279                stream_id: stream("producer-stream"),
4280                content_type: Some("application/octet-stream".to_owned()),
4281                payload: b"a".to_vec(),
4282                close_after: false,
4283                stream_seq: None,
4284                producer: Some(producer("writer-1", 0, 0)),
4285                now_ms: 0,
4286            }),
4287            StreamResponse::Appended {
4288                offset: 0,
4289                next_offset: 1,
4290                closed: false,
4291                deduplicated: false,
4292                producer: Some(producer("writer-1", 0, 0)),
4293            }
4294        );
4295        assert_eq!(
4296            machine.apply(StreamCommand::Append {
4297                stream_id: stream("producer-stream"),
4298                content_type: Some("application/octet-stream".to_owned()),
4299                payload: b"ignored-retry-body".to_vec(),
4300                close_after: false,
4301                stream_seq: None,
4302                producer: Some(producer("writer-1", 0, 0)),
4303                now_ms: 0,
4304            }),
4305            StreamResponse::Appended {
4306                offset: 0,
4307                next_offset: 1,
4308                closed: false,
4309                deduplicated: true,
4310                producer: Some(producer("writer-1", 0, 0)),
4311            }
4312        );
4313        assert_eq!(
4314            machine
4315                .read(&stream("producer-stream"), 0, 16)
4316                .expect("read")
4317                .payload,
4318            b"a"
4319        );
4320
4321        assert!(matches!(
4322            machine.apply(StreamCommand::Append {
4323                stream_id: stream("producer-stream"),
4324                content_type: Some("application/octet-stream".to_owned()),
4325                payload: b"gap".to_vec(),
4326                close_after: false,
4327                stream_seq: None,
4328                producer: Some(producer("writer-1", 0, 2)),
4329                now_ms: 0,
4330            }),
4331            StreamResponse::Error {
4332                code: StreamErrorCode::ProducerSeqConflict,
4333                ..
4334            }
4335        ));
4336
4337        assert_eq!(
4338            machine.apply(StreamCommand::Append {
4339                stream_id: stream("producer-stream"),
4340                content_type: Some("application/octet-stream".to_owned()),
4341                payload: b"b".to_vec(),
4342                close_after: false,
4343                stream_seq: None,
4344                producer: Some(producer("writer-1", 1, 0)),
4345                now_ms: 0,
4346            }),
4347            StreamResponse::Appended {
4348                offset: 1,
4349                next_offset: 2,
4350                closed: false,
4351                deduplicated: false,
4352                producer: Some(producer("writer-1", 1, 0)),
4353            }
4354        );
4355        assert!(matches!(
4356            machine.apply(StreamCommand::Append {
4357                stream_id: stream("producer-stream"),
4358                content_type: Some("application/octet-stream".to_owned()),
4359                payload: b"stale".to_vec(),
4360                close_after: false,
4361                stream_seq: None,
4362                producer: Some(producer("writer-1", 0, 1)),
4363                now_ms: 0,
4364            }),
4365            StreamResponse::Error {
4366                code: StreamErrorCode::ProducerEpochStale,
4367                ..
4368            }
4369        ));
4370    }
4371
4372    #[test]
4373    fn producer_append_batch_deduplicates_retries_without_partial_mutation() {
4374        let mut machine = StreamStateMachine::new();
4375        create_bucket(&mut machine);
4376        create_stream(&mut machine, "producer-batch");
4377
4378        let first_payloads = [b"ab".as_slice(), b"c".as_slice()];
4379        let first = machine
4380            .append_batch_borrowed(
4381                stream("producer-batch"),
4382                Some("application/octet-stream"),
4383                &first_payloads,
4384                Some(producer("writer-1", 0, 0)),
4385                0,
4386            )
4387            .expect("first batch");
4388        assert_eq!(
4389            first.items,
4390            vec![
4391                StreamBatchAppendItem {
4392                    offset: 0,
4393                    next_offset: 2,
4394                    closed: false,
4395                    deduplicated: false,
4396                },
4397                StreamBatchAppendItem {
4398                    offset: 2,
4399                    next_offset: 3,
4400                    closed: false,
4401                    deduplicated: false,
4402                },
4403            ]
4404        );
4405        assert!(!first.deduplicated);
4406
4407        let duplicate = machine
4408            .append_batch_borrowed(
4409                stream("producer-batch"),
4410                Some("application/octet-stream"),
4411                &first_payloads,
4412                Some(producer("writer-1", 0, 0)),
4413                0,
4414            )
4415            .expect("duplicate batch");
4416        assert!(duplicate.deduplicated);
4417        assert!(duplicate.items.iter().all(|item| item.deduplicated));
4418        assert_eq!(duplicate.items[0].offset, 0);
4419        assert_eq!(duplicate.items[1].next_offset, 3);
4420        assert_eq!(
4421            machine
4422                .read(&stream("producer-batch"), 0, 16)
4423                .expect("read")
4424                .payload,
4425            b"abc"
4426        );
4427
4428        let invalid_payloads = [b"".as_slice()];
4429        assert!(matches!(
4430            machine.append_batch_borrowed(
4431                stream("producer-batch"),
4432                Some("application/octet-stream"),
4433                &invalid_payloads,
4434                Some(producer("writer-1", 0, 1)),
4435                0,
4436            ),
4437            Err(StreamResponse::Error {
4438                code: StreamErrorCode::EmptyAppend,
4439                ..
4440            })
4441        ));
4442
4443        let next_payloads = [b"d".as_slice()];
4444        let next = machine
4445            .append_batch_borrowed(
4446                stream("producer-batch"),
4447                Some("application/octet-stream"),
4448                &next_payloads,
4449                Some(producer("writer-1", 0, 1)),
4450                0,
4451            )
4452            .expect("next batch");
4453        assert_eq!(next.items[0].offset, 3);
4454        assert_eq!(
4455            machine
4456                .read(&stream("producer-batch"), 0, 16)
4457                .expect("read")
4458                .payload,
4459            b"abcd"
4460        );
4461    }
4462
4463    #[test]
4464    fn producer_state_survives_snapshot_restore() {
4465        let mut machine = StreamStateMachine::new();
4466        create_bucket(&mut machine);
4467        create_stream(&mut machine, "producer-snapshot");
4468        assert!(matches!(
4469            machine.apply(StreamCommand::Append {
4470                stream_id: stream("producer-snapshot"),
4471                content_type: Some("application/octet-stream".to_owned()),
4472                payload: b"a".to_vec(),
4473                close_after: false,
4474                stream_seq: None,
4475                producer: Some(producer("writer-1", 0, 0)),
4476                now_ms: 0,
4477            }),
4478            StreamResponse::Appended {
4479                deduplicated: false,
4480                ..
4481            }
4482        ));
4483
4484        let snapshot = machine.snapshot();
4485        assert_eq!(snapshot.streams[0].producer_states.len(), 1);
4486        assert_eq!(
4487            snapshot.streams[0].producer_states[0].last_items,
4488            vec![ProducerAppendRecord {
4489                start_offset: 0,
4490                next_offset: 1,
4491                closed: false,
4492            }]
4493        );
4494        let mut restored = StreamStateMachine::restore(snapshot).expect("restore snapshot");
4495
4496        assert!(matches!(
4497            restored.apply(StreamCommand::Append {
4498                stream_id: stream("producer-snapshot"),
4499                content_type: Some("application/octet-stream".to_owned()),
4500                payload: b"retry".to_vec(),
4501                close_after: false,
4502                stream_seq: None,
4503                producer: Some(producer("writer-1", 0, 0)),
4504                now_ms: 0,
4505            }),
4506            StreamResponse::Appended {
4507                offset: 0,
4508                next_offset: 1,
4509                deduplicated: true,
4510                ..
4511            }
4512        ));
4513        assert_eq!(
4514            restored.apply(StreamCommand::Append {
4515                stream_id: stream("producer-snapshot"),
4516                content_type: Some("application/octet-stream".to_owned()),
4517                payload: b"b".to_vec(),
4518                close_after: false,
4519                stream_seq: None,
4520                producer: Some(producer("writer-1", 0, 1)),
4521                now_ms: 0,
4522            }),
4523            StreamResponse::Appended {
4524                offset: 1,
4525                next_offset: 2,
4526                closed: false,
4527                deduplicated: false,
4528                producer: Some(producer("writer-1", 0, 1)),
4529            }
4530        );
4531    }
4532
4533    #[test]
4534    fn stream_ttl_uses_sliding_access_window() {
4535        let mut machine = StreamStateMachine::new();
4536        create_bucket(&mut machine);
4537        let stream_id = stream("ttl-window");
4538
4539        assert_eq!(
4540            machine.apply(StreamCommand::CreateStream {
4541                stream_id: stream_id.clone(),
4542                content_type: "application/octet-stream".to_owned(),
4543                initial_payload: b"hi".to_vec(),
4544                close_after: false,
4545                stream_seq: None,
4546                producer: None,
4547                stream_ttl_seconds: Some(1),
4548                stream_expires_at_ms: None,
4549                forked_from: None,
4550                fork_offset: None,
4551                now_ms: 1_000,
4552            }),
4553            StreamResponse::Created {
4554                stream_id: stream_id.clone(),
4555                next_offset: 2,
4556                closed: false,
4557            }
4558        );
4559
4560        assert_eq!(
4561            machine.access_requires_write(&stream_id, 1_500, false),
4562            Ok(false)
4563        );
4564        assert_eq!(
4565            machine
4566                .head_at(&stream_id, 1_500)
4567                .expect("head before ttl expiry")
4568                .last_ttl_touch_at_ms,
4569            1_000
4570        );
4571        assert_eq!(
4572            machine.access_requires_write(&stream_id, 1_500, true),
4573            Ok(true)
4574        );
4575        assert_eq!(
4576            machine.apply(StreamCommand::TouchStreamAccess {
4577                stream_id: stream_id.clone(),
4578                now_ms: 1_500,
4579                renew_ttl: true,
4580            }),
4581            StreamResponse::Accessed {
4582                changed: true,
4583                expired: false,
4584            }
4585        );
4586
4587        assert!(machine.read_plan_at(&stream_id, 2, 16, 2_400).is_ok());
4588        assert_eq!(
4589            machine.apply(StreamCommand::Append {
4590                stream_id: stream_id.clone(),
4591                content_type: Some("application/octet-stream".to_owned()),
4592                payload: b"!".to_vec(),
4593                close_after: false,
4594                stream_seq: None,
4595                producer: None,
4596                now_ms: 2_400,
4597            }),
4598            StreamResponse::Appended {
4599                offset: 2,
4600                next_offset: 3,
4601                closed: false,
4602                deduplicated: false,
4603                producer: None,
4604            }
4605        );
4606        assert!(machine.head_at(&stream_id, 3_399).is_some());
4607        assert!(machine.head_at(&stream_id, 3_400).is_none());
4608        assert!(matches!(
4609            machine.apply(StreamCommand::Append {
4610                stream_id: stream_id.clone(),
4611                content_type: Some("application/octet-stream".to_owned()),
4612                payload: b"late".to_vec(),
4613                close_after: false,
4614                stream_seq: None,
4615                producer: None,
4616                now_ms: 3_401,
4617            }),
4618            StreamResponse::Error {
4619                code: StreamErrorCode::StreamNotFound,
4620                ..
4621            }
4622        ));
4623    }
4624
4625    #[test]
4626    fn stream_expires_at_is_absolute_and_recreate_after_expiry() {
4627        let mut machine = StreamStateMachine::new();
4628        create_bucket(&mut machine);
4629        let stream_id = stream("absolute-expiry");
4630
4631        assert!(matches!(
4632            machine.apply(StreamCommand::CreateStream {
4633                stream_id: stream_id.clone(),
4634                content_type: "application/octet-stream".to_owned(),
4635                initial_payload: Vec::new(),
4636                close_after: false,
4637                stream_seq: None,
4638                producer: None,
4639                stream_ttl_seconds: None,
4640                stream_expires_at_ms: Some(2_000),
4641                forked_from: None,
4642                fork_offset: None,
4643                now_ms: 1_000,
4644            }),
4645            StreamResponse::Created { .. }
4646        ));
4647        assert_eq!(
4648            machine.apply(StreamCommand::TouchStreamAccess {
4649                stream_id: stream_id.clone(),
4650                now_ms: 1_500,
4651                renew_ttl: true,
4652            }),
4653            StreamResponse::Accessed {
4654                changed: false,
4655                expired: false,
4656            }
4657        );
4658        assert!(matches!(
4659            machine.apply(StreamCommand::Append {
4660                stream_id: stream_id.clone(),
4661                content_type: Some("application/octet-stream".to_owned()),
4662                payload: b"body".to_vec(),
4663                close_after: false,
4664                stream_seq: None,
4665                producer: None,
4666                now_ms: 1_600,
4667            }),
4668            StreamResponse::Appended { .. }
4669        ));
4670        assert!(machine.read_plan_at(&stream_id, 0, 16, 1_999).is_ok());
4671        assert!(matches!(
4672            machine.read_plan_at(&stream_id, 0, 16, 2_000),
4673            Err(StreamResponse::Error {
4674                code: StreamErrorCode::StreamNotFound,
4675                ..
4676            })
4677        ));
4678        assert_eq!(
4679            machine.apply(StreamCommand::CreateStream {
4680                stream_id: stream_id.clone(),
4681                content_type: "text/plain".to_owned(),
4682                initial_payload: Vec::new(),
4683                close_after: false,
4684                stream_seq: None,
4685                producer: None,
4686                stream_ttl_seconds: None,
4687                stream_expires_at_ms: None,
4688                forked_from: None,
4689                fork_offset: None,
4690                now_ms: 2_001,
4691            }),
4692            StreamResponse::Created {
4693                stream_id,
4694                next_offset: 0,
4695                closed: false,
4696            }
4697        );
4698    }
4699
4700    #[test]
4701    fn producer_duplicate_final_append_remains_idempotent_after_close() {
4702        let mut machine = StreamStateMachine::new();
4703        create_bucket(&mut machine);
4704        create_stream(&mut machine, "producer-close");
4705
4706        assert_eq!(
4707            machine.apply(StreamCommand::Append {
4708                stream_id: stream("producer-close"),
4709                content_type: Some("application/octet-stream".to_owned()),
4710                payload: b"final".to_vec(),
4711                close_after: true,
4712                stream_seq: None,
4713                producer: Some(producer("writer-1", 0, 0)),
4714                now_ms: 0,
4715            }),
4716            StreamResponse::Appended {
4717                offset: 0,
4718                next_offset: 5,
4719                closed: true,
4720                deduplicated: false,
4721                producer: Some(producer("writer-1", 0, 0)),
4722            }
4723        );
4724        assert_eq!(
4725            machine.apply(StreamCommand::Append {
4726                stream_id: stream("producer-close"),
4727                content_type: Some("application/octet-stream".to_owned()),
4728                payload: b"final".to_vec(),
4729                close_after: true,
4730                stream_seq: None,
4731                producer: Some(producer("writer-1", 0, 0)),
4732                now_ms: 0,
4733            }),
4734            StreamResponse::Appended {
4735                offset: 0,
4736                next_offset: 5,
4737                closed: true,
4738                deduplicated: true,
4739                producer: Some(producer("writer-1", 0, 0)),
4740            }
4741        );
4742        assert!(matches!(
4743            machine.apply(StreamCommand::Append {
4744                stream_id: stream("producer-close"),
4745                content_type: Some("application/octet-stream".to_owned()),
4746                payload: b"too-late".to_vec(),
4747                close_after: false,
4748                stream_seq: None,
4749                producer: Some(producer("writer-1", 0, 1)),
4750                now_ms: 0,
4751            }),
4752            StreamResponse::Error {
4753                code: StreamErrorCode::StreamClosed,
4754                next_offset: Some(5),
4755                ..
4756            }
4757        ));
4758    }
4759
4760    #[test]
4761    fn append_conflict_precedence_reports_closed_before_mismatch_or_seq() {
4762        let mut machine = StreamStateMachine::new();
4763        create_bucket(&mut machine);
4764        create_stream(&mut machine, "closed-precedence");
4765
4766        assert_eq!(
4767            machine.apply(StreamCommand::Append {
4768                stream_id: stream("closed-precedence"),
4769                content_type: Some("application/octet-stream".to_owned()),
4770                payload: b"final".to_vec(),
4771                close_after: true,
4772                stream_seq: Some("0002".to_owned()),
4773                producer: None,
4774                now_ms: 0,
4775            }),
4776            StreamResponse::Appended {
4777                offset: 0,
4778                next_offset: 5,
4779                closed: true,
4780                deduplicated: false,
4781                producer: None,
4782            }
4783        );
4784
4785        assert!(matches!(
4786            machine.apply(StreamCommand::Append {
4787                stream_id: stream("closed-precedence"),
4788                content_type: Some("text/plain".to_owned()),
4789                payload: b"too-late".to_vec(),
4790                close_after: false,
4791                stream_seq: Some("0001".to_owned()),
4792                producer: None,
4793                now_ms: 0,
4794            }),
4795            StreamResponse::Error {
4796                code: StreamErrorCode::StreamClosed,
4797                next_offset: Some(5),
4798                ..
4799            }
4800        ));
4801    }
4802
4803    #[test]
4804    fn bucket_delete_requires_empty_bucket() {
4805        let mut machine = StreamStateMachine::new();
4806        create_bucket(&mut machine);
4807        create_stream(&mut machine, "s-1");
4808
4809        assert!(matches!(
4810            machine.apply(StreamCommand::DeleteBucket {
4811                bucket_id: "benchcmp".to_owned(),
4812            }),
4813            StreamResponse::Error {
4814                code: StreamErrorCode::BucketNotEmpty,
4815                ..
4816            }
4817        ));
4818        assert_eq!(
4819            machine.apply(StreamCommand::DeleteStream {
4820                stream_id: stream("s-1"),
4821            }),
4822            StreamResponse::Deleted {
4823                hard_deleted: true,
4824                parent_to_release: None,
4825            }
4826        );
4827        assert_eq!(
4828            machine.apply(StreamCommand::DeleteBucket {
4829                bucket_id: "benchcmp".to_owned(),
4830            }),
4831            StreamResponse::BucketDeleted {
4832                bucket_id: "benchcmp".to_owned(),
4833            }
4834        );
4835    }
4836
4837    #[test]
4838    fn fork_refs_soft_delete_and_release_parent_on_last_child() {
4839        let mut machine = StreamStateMachine::new();
4840        create_bucket(&mut machine);
4841        create_stream(&mut machine, "source");
4842        create_stream(&mut machine, "fork");
4843
4844        assert_eq!(
4845            machine.apply(StreamCommand::AddForkRef {
4846                stream_id: stream("source"),
4847                now_ms: 0,
4848            }),
4849            StreamResponse::ForkRefAdded { fork_ref_count: 1 }
4850        );
4851        assert_eq!(
4852            machine.apply(StreamCommand::DeleteStream {
4853                stream_id: stream("source"),
4854            }),
4855            StreamResponse::Deleted {
4856                hard_deleted: false,
4857                parent_to_release: None,
4858            }
4859        );
4860        assert!(matches!(
4861            machine.read_plan(&stream("source"), 0, 1),
4862            Err(StreamResponse::Error {
4863                code: StreamErrorCode::StreamGone,
4864                ..
4865            })
4866        ));
4867        assert_eq!(
4868            machine.apply(StreamCommand::DeleteStream {
4869                stream_id: stream("fork"),
4870            }),
4871            StreamResponse::Deleted {
4872                hard_deleted: true,
4873                parent_to_release: None,
4874            }
4875        );
4876        assert_eq!(
4877            machine.apply(StreamCommand::ReleaseForkRef {
4878                stream_id: stream("source"),
4879            }),
4880            StreamResponse::ForkRefReleased {
4881                hard_deleted: true,
4882                fork_ref_count: 0,
4883                parent_to_release: None,
4884            }
4885        );
4886        assert!(machine.head(&stream("source")).is_none());
4887    }
4888
4889    #[test]
4890    fn publish_snapshot_advances_retention_on_message_boundary() {
4891        let mut machine = StreamStateMachine::new();
4892        create_bucket(&mut machine);
4893        create_stream(&mut machine, "snap");
4894        assert!(matches!(
4895            machine.apply(StreamCommand::Append {
4896                stream_id: stream("snap"),
4897                content_type: Some("application/octet-stream".to_owned()),
4898                payload: b"abc".to_vec(),
4899                close_after: false,
4900                stream_seq: None,
4901                producer: None,
4902                now_ms: 0,
4903            }),
4904            StreamResponse::Appended {
4905                offset: 0,
4906                next_offset: 3,
4907                ..
4908            }
4909        ));
4910        assert!(matches!(
4911            machine.apply(StreamCommand::Append {
4912                stream_id: stream("snap"),
4913                content_type: Some("application/octet-stream".to_owned()),
4914                payload: b"de".to_vec(),
4915                close_after: false,
4916                stream_seq: None,
4917                producer: None,
4918                now_ms: 0,
4919            }),
4920            StreamResponse::Appended {
4921                offset: 3,
4922                next_offset: 5,
4923                ..
4924            }
4925        ));
4926
4927        assert_eq!(
4928            machine.apply(StreamCommand::PublishSnapshot {
4929                stream_id: stream("snap"),
4930                snapshot_offset: 3,
4931                content_type: "application/json".to_owned(),
4932                payload: br#"{"state":"abc"}"#.to_vec(),
4933                now_ms: 0,
4934            }),
4935            StreamResponse::SnapshotPublished { snapshot_offset: 3 }
4936        );
4937        assert!(matches!(
4938            machine.read_plan(&stream("snap"), 0, 1),
4939            Err(StreamResponse::Error {
4940                code: StreamErrorCode::StreamGone,
4941                next_offset: Some(3),
4942                ..
4943            })
4944        ));
4945        let read = machine.read(&stream("snap"), 3, 2).expect("retained read");
4946        assert_eq!(read.payload, b"de");
4947        let snapshot = machine
4948            .read_snapshot(&stream("snap"), 3)
4949            .expect("visible snapshot");
4950        assert_eq!(snapshot.content_type, "application/json");
4951        assert_eq!(snapshot.payload, br#"{"state":"abc"}"#);
4952        let bootstrap = machine.bootstrap_plan(&stream("snap")).expect("bootstrap");
4953        assert_eq!(
4954            bootstrap.snapshot.as_ref().map(|snapshot| snapshot.offset),
4955            Some(3)
4956        );
4957        assert_eq!(
4958            bootstrap.updates,
4959            vec![StreamMessageRecord {
4960                start_offset: 3,
4961                end_offset: 5,
4962            }]
4963        );
4964    }
4965
4966    #[test]
4967    fn publish_snapshot_rejects_unaligned_offset() {
4968        let mut machine = StreamStateMachine::new();
4969        create_bucket(&mut machine);
4970        create_stream(&mut machine, "unaligned");
4971        assert!(matches!(
4972            machine.apply(StreamCommand::Append {
4973                stream_id: stream("unaligned"),
4974                content_type: Some("application/octet-stream".to_owned()),
4975                payload: b"abc".to_vec(),
4976                close_after: false,
4977                stream_seq: None,
4978                producer: None,
4979                now_ms: 0,
4980            }),
4981            StreamResponse::Appended { .. }
4982        ));
4983        assert!(matches!(
4984            machine.apply(StreamCommand::PublishSnapshot {
4985                stream_id: stream("unaligned"),
4986                snapshot_offset: 2,
4987                content_type: "application/octet-stream".to_owned(),
4988                payload: b"ab".to_vec(),
4989                now_ms: 0,
4990            }),
4991            StreamResponse::Error {
4992                code: StreamErrorCode::InvalidSnapshot,
4993                next_offset: Some(3),
4994                ..
4995            }
4996        ));
4997    }
4998
4999    #[test]
5000    fn snapshot_restore_preserves_visible_snapshot_and_message_records() {
5001        let mut machine = StreamStateMachine::new();
5002        create_bucket(&mut machine);
5003        create_stream(&mut machine, "restore-snap");
5004        let _ = machine.apply(StreamCommand::Append {
5005            stream_id: stream("restore-snap"),
5006            content_type: Some("application/octet-stream".to_owned()),
5007            payload: b"abc".to_vec(),
5008            close_after: false,
5009            stream_seq: None,
5010            producer: None,
5011            now_ms: 0,
5012        });
5013        let _ = machine.apply(StreamCommand::Append {
5014            stream_id: stream("restore-snap"),
5015            content_type: Some("application/octet-stream".to_owned()),
5016            payload: b"de".to_vec(),
5017            close_after: false,
5018            stream_seq: None,
5019            producer: None,
5020            now_ms: 0,
5021        });
5022        let _ = machine.apply(StreamCommand::PublishSnapshot {
5023            stream_id: stream("restore-snap"),
5024            snapshot_offset: 3,
5025            content_type: "application/octet-stream".to_owned(),
5026            payload: b"abc-state".to_vec(),
5027            now_ms: 0,
5028        });
5029
5030        let restored = StreamStateMachine::restore(machine.snapshot()).expect("restore");
5031        assert_eq!(
5032            restored
5033                .read_snapshot(&stream("restore-snap"), 3)
5034                .expect("snapshot")
5035                .payload,
5036            b"abc-state"
5037        );
5038        assert_eq!(
5039            restored
5040                .bootstrap_plan(&stream("restore-snap"))
5041                .expect("bootstrap")
5042                .updates,
5043            vec![StreamMessageRecord {
5044                start_offset: 3,
5045                end_offset: 5,
5046            }]
5047        );
5048    }
5049}