Skip to main content

ursula_stream/
state_machine.rs

1use std::collections::{HashMap, HashSet};
2
3use ursula_shard::BucketStreamId;
4
5use crate::command::StreamCommand;
6use crate::model::{
7    AppendExternalInput, AppendStreamInput, ColdChunkRef, ColdFlushCandidate, ExternalPayloadRef,
8    HotPayloadSegment, ObjectPayloadRef, ProducerAppendRecord, ProducerRequest, ProducerSnapshot,
9    ProducerState, StreamBatchAppend, StreamBatchAppendItem, StreamBootstrapPlan,
10    StreamMessageRecord, StreamMetadata, StreamRead, StreamReadObjectSegment, StreamReadPlan,
11    StreamReadSegment, StreamStatus, StreamVisibleSnapshot,
12};
13use crate::response::{StreamErrorCode, StreamResponse};
14use crate::snapshot::{StreamSnapshot, StreamSnapshotEntry, StreamSnapshotError};
15use crate::validate::{validate_bucket_id, validate_stream_id};
16
17#[derive(Debug, Clone, Default)]
18pub struct StreamStateMachine {
19    buckets: HashSet<String>,
20    streams: HashMap<BucketStreamId, StreamMetadata>,
21    payloads: HashMap<BucketStreamId, Vec<u8>>,
22    hot_segments: HashMap<BucketStreamId, Vec<HotPayloadSegment>>,
23    hot_start_offsets: HashMap<BucketStreamId, u64>,
24    cold_chunks: HashMap<BucketStreamId, Vec<ColdChunkRef>>,
25    external_segments: HashMap<BucketStreamId, Vec<ObjectPayloadRef>>,
26    message_records: HashMap<BucketStreamId, Vec<StreamMessageRecord>>,
27    visible_snapshots: HashMap<BucketStreamId, StreamVisibleSnapshot>,
28    producers: HashMap<BucketStreamId, HashMap<String, ProducerState>>,
29}
30
31impl StreamStateMachine {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn apply(&mut self, command: StreamCommand) -> StreamResponse {
37        match command {
38            StreamCommand::CreateBucket { bucket_id } => self.create_bucket(bucket_id),
39            StreamCommand::DeleteBucket { bucket_id } => self.delete_bucket(&bucket_id),
40            StreamCommand::CreateStream {
41                stream_id,
42                content_type,
43                initial_payload,
44                close_after,
45                stream_seq,
46                producer,
47                stream_ttl_seconds,
48                stream_expires_at_ms,
49                forked_from,
50                fork_offset,
51                now_ms,
52            } => self.create_stream(CreateStreamInput {
53                stream_id,
54                content_type,
55                initial_payload,
56                close_after,
57                stream_seq,
58                producer,
59                stream_ttl_seconds,
60                stream_expires_at_ms,
61                forked_from,
62                fork_offset,
63                now_ms,
64            }),
65            StreamCommand::CreateExternal {
66                stream_id,
67                content_type,
68                initial_payload,
69                close_after,
70                stream_seq,
71                producer,
72                stream_ttl_seconds,
73                stream_expires_at_ms,
74                forked_from,
75                fork_offset,
76                now_ms,
77            } => self.create_external_stream(CreateExternalStreamInput {
78                stream_id,
79                content_type,
80                initial_payload,
81                close_after,
82                stream_seq,
83                producer,
84                stream_ttl_seconds,
85                stream_expires_at_ms,
86                forked_from,
87                fork_offset,
88                now_ms,
89            }),
90            StreamCommand::Append {
91                stream_id,
92                content_type,
93                payload,
94                close_after,
95                stream_seq,
96                producer,
97                now_ms,
98            } => self.append_borrowed(AppendStreamInput {
99                stream_id,
100                content_type: content_type.as_deref(),
101                payload: &payload,
102                close_after,
103                stream_seq,
104                producer,
105                now_ms,
106            }),
107            StreamCommand::AppendExternal {
108                stream_id,
109                content_type,
110                payload,
111                close_after,
112                stream_seq,
113                producer,
114                now_ms,
115            } => self.append_external(AppendExternalInput {
116                stream_id,
117                content_type: content_type.as_deref(),
118                payload,
119                close_after,
120                stream_seq,
121                producer,
122                now_ms,
123            }),
124            StreamCommand::AppendBatch {
125                stream_id,
126                content_type,
127                payloads,
128                producer,
129                now_ms,
130            } => match self.append_batch_borrowed(
131                stream_id,
132                content_type.as_deref(),
133                &payloads.iter().map(Vec::as_slice).collect::<Vec<_>>(),
134                producer,
135                now_ms,
136            ) {
137                Ok(batch) => batch
138                    .items
139                    .last()
140                    .map(|item| StreamResponse::Appended {
141                        offset: item.offset,
142                        next_offset: item.next_offset,
143                        closed: item.closed,
144                        deduplicated: item.deduplicated,
145                        producer: None,
146                    })
147                    .unwrap_or_else(|| {
148                        StreamResponse::error(
149                            StreamErrorCode::EmptyAppend,
150                            "append batch must contain at least one payload",
151                        )
152                    }),
153                Err(response) => response,
154            },
155            StreamCommand::PublishSnapshot {
156                stream_id,
157                snapshot_offset,
158                content_type,
159                payload,
160                now_ms,
161            } => self.publish_snapshot(stream_id, snapshot_offset, content_type, payload, now_ms),
162            StreamCommand::TouchStreamAccess {
163                stream_id,
164                now_ms,
165                renew_ttl,
166            } => self.touch_stream_access(&stream_id, now_ms, renew_ttl),
167            StreamCommand::AddForkRef { stream_id, now_ms } => {
168                self.add_fork_ref(&stream_id, now_ms)
169            }
170            StreamCommand::ReleaseForkRef { stream_id } => self.release_fork_ref(&stream_id),
171            StreamCommand::FlushCold { stream_id, chunk } => self.flush_cold(stream_id, chunk),
172            StreamCommand::Close {
173                stream_id,
174                stream_seq,
175                producer,
176                now_ms,
177            } => self.close(stream_id, stream_seq, producer, now_ms),
178            StreamCommand::DeleteStream { stream_id } => self.delete_stream(&stream_id),
179        }
180    }
181
182    pub fn head(&self, stream_id: &BucketStreamId) -> Option<&StreamMetadata> {
183        self.streams.get(stream_id)
184    }
185
186    pub fn head_at(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> Option<&StreamMetadata> {
187        self.expire_stream_if_due(stream_id, now_ms);
188        self.streams.get(stream_id)
189    }
190
191    pub fn access_requires_write(
192        &self,
193        stream_id: &BucketStreamId,
194        now_ms: u64,
195        renew_ttl: bool,
196    ) -> Result<bool, StreamResponse> {
197        self.validate_stream_scope(stream_id)?;
198        let Some(stream) = self.streams.get(stream_id) else {
199            return Err(StreamResponse::error(
200                StreamErrorCode::StreamNotFound,
201                format!("stream '{stream_id}' does not exist"),
202            ));
203        };
204        if is_soft_deleted(stream) {
205            return Err(StreamResponse::error(
206                StreamErrorCode::StreamGone,
207                format!("stream '{stream_id}' is gone"),
208            ));
209        }
210        if stream_is_expired(stream, now_ms) {
211            return Ok(true);
212        }
213        Ok(renew_ttl
214            && stream.stream_ttl_seconds.is_some()
215            && stream.last_ttl_touch_at_ms != now_ms)
216    }
217
218    pub fn hot_start_offset(&self, stream_id: &BucketStreamId) -> u64 {
219        self.hot_start_offsets.get(stream_id).copied().unwrap_or(0)
220    }
221
222    pub fn cold_chunks(&self, stream_id: &BucketStreamId) -> &[ColdChunkRef] {
223        self.cold_chunks
224            .get(stream_id)
225            .map(Vec::as_slice)
226            .unwrap_or(&[])
227    }
228
229    pub fn external_segments(&self, stream_id: &BucketStreamId) -> &[ObjectPayloadRef] {
230        self.external_segments
231            .get(stream_id)
232            .map(Vec::as_slice)
233            .unwrap_or(&[])
234    }
235
236    pub fn hot_segments(&self, stream_id: &BucketStreamId) -> &[HotPayloadSegment] {
237        self.hot_segments
238            .get(stream_id)
239            .map(Vec::as_slice)
240            .unwrap_or(&[])
241    }
242
243    pub fn hot_payload_len(&self, stream_id: &BucketStreamId) -> Result<u64, StreamResponse> {
244        let Some(stream) = self.streams.get(stream_id) else {
245            return Err(StreamResponse::error(
246                StreamErrorCode::StreamNotFound,
247                format!("stream '{stream_id}' does not exist"),
248            ));
249        };
250        if is_soft_deleted(stream) {
251            return Err(StreamResponse::error(
252                StreamErrorCode::StreamGone,
253                format!("stream '{stream_id}' is gone"),
254            ));
255        }
256        let payload = self
257            .payloads
258            .get(stream_id)
259            .expect("payload vector exists for stream metadata");
260        Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
261    }
262
263    pub fn total_hot_payload_bytes(&self) -> u64 {
264        self.payloads
265            .values()
266            .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
267            .sum()
268    }
269
270    pub fn plan_cold_flush(
271        &self,
272        stream_id: &BucketStreamId,
273        min_hot_bytes: usize,
274        max_flush_bytes: usize,
275    ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
276        if max_flush_bytes == 0 {
277            return Ok(None);
278        }
279        let Some(stream) = self.streams.get(stream_id) else {
280            return Err(StreamResponse::error(
281                StreamErrorCode::StreamNotFound,
282                format!("stream '{stream_id}' does not exist"),
283            ));
284        };
285        if is_soft_deleted(stream) {
286            return Err(StreamResponse::error(
287                StreamErrorCode::StreamGone,
288                format!("stream '{stream_id}' is gone"),
289            ));
290        }
291        let Some(first_segment) = self.hot_segments(stream_id).first() else {
292            return Ok(None);
293        };
294        let mut payload_end = first_segment.payload_start;
295        let mut end_offset = first_segment.start_offset;
296        let mut flush_len = 0usize;
297        for segment in self.hot_segments(stream_id) {
298            if segment.start_offset != end_offset || segment.payload_start != payload_end {
299                break;
300            }
301            let remaining = max_flush_bytes.saturating_sub(flush_len);
302            if remaining == 0 {
303                break;
304            }
305            let segment_len = segment.payload_end - segment.payload_start;
306            let take = segment_len.min(remaining);
307            flush_len += take;
308            payload_end += take;
309            end_offset = end_offset.saturating_add(u64::try_from(take).expect("take fits u64"));
310            if take < segment_len {
311                break;
312            }
313        }
314        if flush_len < min_hot_bytes {
315            return Ok(None);
316        }
317        let payload = self
318            .payloads
319            .get(stream_id)
320            .expect("payload vector exists for stream metadata");
321        let start_offset = first_segment.start_offset;
322        Ok(Some(ColdFlushCandidate {
323            stream_id: stream_id.clone(),
324            start_offset,
325            end_offset,
326            payload: payload[first_segment.payload_start..payload_end].to_vec(),
327        }))
328    }
329
330    pub fn plan_next_cold_flush(
331        &self,
332        min_hot_bytes: usize,
333        max_flush_bytes: usize,
334    ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
335        if max_flush_bytes == 0 {
336            return Ok(None);
337        }
338        let mut stream_ids = self.streams.keys().cloned().collect::<Vec<_>>();
339        stream_ids.sort_by(compare_stream_ids);
340        for stream_id in stream_ids {
341            match self.plan_cold_flush(&stream_id, min_hot_bytes, max_flush_bytes) {
342                Ok(Some(candidate)) => return Ok(Some(candidate)),
343                Ok(None) => {}
344                Err(StreamResponse::Error {
345                    code: StreamErrorCode::StreamGone | StreamErrorCode::StreamNotFound,
346                    ..
347                }) => {}
348                Err(err) => return Err(err),
349            }
350        }
351        Ok(None)
352    }
353
354    pub fn plan_next_cold_flush_batch(
355        &self,
356        min_hot_bytes: usize,
357        max_flush_bytes: usize,
358        max_candidates: usize,
359    ) -> Result<Vec<ColdFlushCandidate>, StreamResponse> {
360        if max_candidates == 0 || max_flush_bytes == 0 {
361            return Ok(Vec::new());
362        }
363        let mut preview = self.clone();
364        let mut candidates = Vec::with_capacity(max_candidates);
365        while candidates.len() < max_candidates {
366            let Some(candidate) = preview.plan_next_cold_flush(min_hot_bytes, max_flush_bytes)?
367            else {
368                break;
369            };
370            let chunk = ColdChunkRef {
371                start_offset: candidate.start_offset,
372                end_offset: candidate.end_offset,
373                s3_path: "planned-cold-flush-batch".to_owned(),
374                object_size: u64::try_from(candidate.payload.len()).expect("payload len fits u64"),
375            };
376            match preview.flush_cold(candidate.stream_id.clone(), chunk) {
377                StreamResponse::ColdFlushed { .. } => candidates.push(candidate),
378                StreamResponse::Error { .. } => break,
379                other => {
380                    return Err(StreamResponse::error(
381                        StreamErrorCode::InvalidColdFlush,
382                        format!("unexpected cold flush planning response: {other:?}"),
383                    ));
384                }
385            }
386        }
387        Ok(candidates)
388    }
389
390    pub fn bucket_exists(&self, bucket_id: &str) -> bool {
391        self.buckets.contains(bucket_id)
392    }
393
394    pub fn snapshot(&self) -> StreamSnapshot {
395        let mut buckets = self.buckets.iter().cloned().collect::<Vec<_>>();
396        buckets.sort();
397
398        let mut streams = self
399            .streams
400            .values()
401            .cloned()
402            .map(|metadata| {
403                let stream_id = metadata.stream_id.clone();
404                let payload = self
405                    .payloads
406                    .get(&stream_id)
407                    .expect("payload vector exists for stream metadata")
408                    .clone();
409                let producer_states = self.producer_snapshot(&stream_id);
410                StreamSnapshotEntry {
411                    metadata,
412                    hot_start_offset: self.hot_start_offset(&stream_id),
413                    payload,
414                    hot_segments: self
415                        .hot_segments
416                        .get(&stream_id)
417                        .cloned()
418                        .unwrap_or_default(),
419                    cold_chunks: self
420                        .cold_chunks
421                        .get(&stream_id)
422                        .cloned()
423                        .unwrap_or_default(),
424                    external_segments: self
425                        .external_segments
426                        .get(&stream_id)
427                        .cloned()
428                        .unwrap_or_default(),
429                    message_records: self
430                        .message_records
431                        .get(&stream_id)
432                        .cloned()
433                        .unwrap_or_default(),
434                    visible_snapshot: self.visible_snapshots.get(&stream_id).cloned(),
435                    producer_states,
436                }
437            })
438            .collect::<Vec<_>>();
439        streams.sort_by(|left, right| {
440            compare_stream_ids(&left.metadata.stream_id, &right.metadata.stream_id)
441        });
442
443        StreamSnapshot { buckets, streams }
444    }
445
446    pub fn restore(snapshot: StreamSnapshot) -> Result<Self, StreamSnapshotError> {
447        let mut machine = Self::default();
448        for bucket_id in snapshot.buckets {
449            if !machine.buckets.insert(bucket_id.clone()) {
450                return Err(StreamSnapshotError::DuplicateBucket(bucket_id));
451            }
452        }
453
454        for entry in snapshot.streams {
455            let stream_id = entry.metadata.stream_id.clone();
456            if !machine.buckets.contains(&stream_id.bucket_id) {
457                return Err(StreamSnapshotError::MissingBucket(stream_id));
458            }
459            if let Some(snapshot) = entry.visible_snapshot.as_ref()
460                && snapshot.offset > entry.metadata.tail_offset
461            {
462                return Err(StreamSnapshotError::SnapshotOffsetOutOfRange {
463                    stream_id,
464                    snapshot_offset: snapshot.offset,
465                    tail_offset: entry.metadata.tail_offset,
466                });
467            }
468            let retained_offset = entry
469                .visible_snapshot
470                .as_ref()
471                .map(|snapshot| snapshot.offset)
472                .unwrap_or(0);
473            let hot_segments = if entry.hot_segments.is_empty() && !entry.payload.is_empty() {
474                vec![HotPayloadSegment {
475                    start_offset: entry.hot_start_offset,
476                    end_offset: entry.metadata.tail_offset,
477                    payload_start: 0,
478                    payload_end: entry.payload.len(),
479                }]
480            } else {
481                entry.hot_segments
482            };
483            if !hot_segments_match_payload(&hot_segments, entry.payload.len())
484                || !payload_sources_cover_retained_suffix(
485                    &entry.cold_chunks,
486                    &entry.external_segments,
487                    &hot_segments,
488                    retained_offset,
489                    entry.metadata.tail_offset,
490                )
491            {
492                return Err(StreamSnapshotError::PayloadLengthMismatch {
493                    stream_id,
494                    tail_offset: entry.metadata.tail_offset,
495                    payload_len: entry.payload.len(),
496                });
497            }
498            if !message_records_cover_retained_suffix(
499                &entry.message_records,
500                retained_offset,
501                entry.metadata.tail_offset,
502            ) {
503                return Err(StreamSnapshotError::MessageBoundaryMismatch { stream_id });
504            }
505            if machine
506                .streams
507                .insert(entry.metadata.stream_id.clone(), entry.metadata)
508                .is_some()
509            {
510                return Err(StreamSnapshotError::DuplicateStream(stream_id));
511            }
512            let producer_states = restore_producer_states(&stream_id, entry.producer_states)?;
513            if !hot_segments.is_empty() {
514                machine.hot_segments.insert(stream_id.clone(), hot_segments);
515            }
516            if !entry.cold_chunks.is_empty() {
517                machine
518                    .cold_chunks
519                    .insert(stream_id.clone(), entry.cold_chunks);
520            }
521            if !entry.external_segments.is_empty() {
522                machine
523                    .external_segments
524                    .insert(stream_id.clone(), entry.external_segments);
525            }
526            if !entry.message_records.is_empty() {
527                machine
528                    .message_records
529                    .insert(stream_id.clone(), entry.message_records);
530            }
531            if let Some(snapshot) = entry.visible_snapshot {
532                machine
533                    .visible_snapshots
534                    .insert(stream_id.clone(), snapshot);
535            }
536            machine.payloads.insert(stream_id.clone(), entry.payload);
537            machine.producers.insert(stream_id.clone(), producer_states);
538            machine.refresh_hot_start_offset(&stream_id);
539        }
540
541        Ok(machine)
542    }
543
544    pub fn read(
545        &self,
546        stream_id: &BucketStreamId,
547        offset: u64,
548        max_len: usize,
549    ) -> Result<StreamRead, StreamResponse> {
550        let plan = self.read_plan(stream_id, offset, max_len)?;
551        if plan
552            .segments
553            .iter()
554            .any(|segment| matches!(segment, StreamReadSegment::Object(_)))
555        {
556            return Err(StreamResponse::error_with_next_offset(
557                StreamErrorCode::InvalidColdFlush,
558                format!("stream '{stream_id}' read requires object payload store"),
559                plan.next_offset,
560            ));
561        }
562        let payload = plan
563            .segments
564            .iter()
565            .flat_map(|segment| match segment {
566                StreamReadSegment::Hot(payload) => payload.as_slice(),
567                StreamReadSegment::Object(_) => unreachable!("object segments checked above"),
568            })
569            .copied()
570            .collect();
571        Ok(StreamRead {
572            offset: plan.offset,
573            next_offset: plan.next_offset,
574            content_type: plan.content_type,
575            payload,
576            up_to_date: plan.up_to_date,
577            closed: plan.closed,
578        })
579    }
580
581    pub fn read_plan(
582        &self,
583        stream_id: &BucketStreamId,
584        offset: u64,
585        max_len: usize,
586    ) -> Result<StreamReadPlan, StreamResponse> {
587        self.read_plan_at(stream_id, offset, max_len, 0)
588    }
589
590    pub fn read_plan_at(
591        &self,
592        stream_id: &BucketStreamId,
593        offset: u64,
594        max_len: usize,
595        now_ms: u64,
596    ) -> Result<StreamReadPlan, StreamResponse> {
597        let Some(stream) = self.streams.get(stream_id) else {
598            return Err(StreamResponse::error(
599                StreamErrorCode::StreamNotFound,
600                format!("stream '{stream_id}' does not exist"),
601            ));
602        };
603        if is_soft_deleted(stream) {
604            return Err(StreamResponse::error(
605                StreamErrorCode::StreamGone,
606                format!("stream '{stream_id}' is gone"),
607            ));
608        }
609        if stream_is_expired(stream, now_ms) {
610            return Err(StreamResponse::error(
611                StreamErrorCode::StreamNotFound,
612                format!("stream '{stream_id}' does not exist"),
613            ));
614        }
615        if offset > stream.tail_offset {
616            return Err(StreamResponse::error_with_next_offset(
617                StreamErrorCode::OffsetOutOfRange,
618                format!(
619                    "offset {offset} is beyond stream '{}' tail {}",
620                    stream_id, stream.tail_offset
621                ),
622                stream.tail_offset,
623            ));
624        }
625        let retained_offset = self.earliest_retained_offset(stream_id);
626        if offset < retained_offset {
627            return Err(StreamResponse::error_with_next_offset(
628                StreamErrorCode::StreamGone,
629                format!(
630                    "offset {offset} is older than stream '{}' retained offset {retained_offset}",
631                    stream_id
632                ),
633                retained_offset,
634            ));
635        }
636
637        let max_len_u64 = u64::try_from(max_len).unwrap_or(u64::MAX);
638        let next_offset = stream.tail_offset.min(offset.saturating_add(max_len_u64));
639        let payload = self
640            .payloads
641            .get(stream_id)
642            .expect("payload vector exists for stream metadata");
643        let mut segments = Vec::<(u64, StreamReadSegment)>::new();
644        for chunk in self.cold_chunks(stream_id) {
645            let start = offset.max(chunk.start_offset);
646            let end = next_offset.min(chunk.end_offset);
647            if start < end {
648                segments.push((
649                    start,
650                    StreamReadSegment::Object(StreamReadObjectSegment {
651                        object: ObjectPayloadRef::from(chunk),
652                        read_start_offset: start,
653                        len: usize::try_from(end - start).expect("object read len fits usize"),
654                    }),
655                ));
656            }
657        }
658        for object in self.external_segments(stream_id) {
659            let start = offset.max(object.start_offset);
660            let end = next_offset.min(object.end_offset);
661            if start < end {
662                segments.push((
663                    start,
664                    StreamReadSegment::Object(StreamReadObjectSegment {
665                        object: object.clone(),
666                        read_start_offset: start,
667                        len: usize::try_from(end - start).expect("object read len fits usize"),
668                    }),
669                ));
670            }
671        }
672        for segment in self.hot_segments(stream_id) {
673            let start = offset.max(segment.start_offset);
674            let end = next_offset.min(segment.end_offset);
675            if start < end {
676                let payload_start = segment.payload_start
677                    + usize::try_from(start - segment.start_offset)
678                        .expect("hot segment start fits usize");
679                let payload_end = segment.payload_start
680                    + usize::try_from(end - segment.start_offset)
681                        .expect("hot segment end fits usize");
682                segments.push((
683                    start,
684                    StreamReadSegment::Hot(payload[payload_start..payload_end].to_vec()),
685                ));
686            }
687        }
688        segments.sort_by_key(|(start, _)| *start);
689        if !segments_cover_range(&segments, offset, next_offset) {
690            return Err(StreamResponse::error_with_next_offset(
691                StreamErrorCode::InvalidColdFlush,
692                format!("stream '{stream_id}' has missing payload segment metadata"),
693                next_offset,
694            ));
695        }
696        Ok(StreamReadPlan {
697            offset,
698            next_offset,
699            content_type: stream.content_type.clone(),
700            segments: segments.into_iter().map(|(_, segment)| segment).collect(),
701            up_to_date: next_offset == stream.tail_offset,
702            closed: stream.status == StreamStatus::Closed,
703        })
704    }
705
706    pub fn latest_snapshot(
707        &self,
708        stream_id: &BucketStreamId,
709    ) -> Result<Option<StreamVisibleSnapshot>, StreamResponse> {
710        let Some(stream) = self.streams.get(stream_id) else {
711            return Err(StreamResponse::error(
712                StreamErrorCode::StreamNotFound,
713                format!("stream '{stream_id}' does not exist"),
714            ));
715        };
716        if is_soft_deleted(stream) {
717            return Err(StreamResponse::error(
718                StreamErrorCode::StreamGone,
719                format!("stream '{stream_id}' is gone"),
720            ));
721        }
722        Ok(self.visible_snapshots.get(stream_id).cloned())
723    }
724
725    pub fn read_snapshot(
726        &self,
727        stream_id: &BucketStreamId,
728        snapshot_offset: u64,
729    ) -> Result<StreamVisibleSnapshot, StreamResponse> {
730        let snapshot = self.latest_snapshot(stream_id)?;
731        match snapshot {
732            Some(snapshot) if snapshot.offset == snapshot_offset => Ok(snapshot),
733            _ => Err(StreamResponse::error(
734                StreamErrorCode::SnapshotNotFound,
735                format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
736            )),
737        }
738    }
739
740    pub fn delete_snapshot(
741        &self,
742        stream_id: &BucketStreamId,
743        snapshot_offset: u64,
744    ) -> StreamResponse {
745        match self.latest_snapshot(stream_id) {
746            Ok(Some(snapshot)) if snapshot.offset == snapshot_offset => StreamResponse::error(
747                StreamErrorCode::SnapshotConflict,
748                format!(
749                    "snapshot {snapshot_offset} for stream '{stream_id}' is the latest visible snapshot"
750                ),
751            ),
752            Ok(_) => StreamResponse::error(
753                StreamErrorCode::SnapshotNotFound,
754                format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
755            ),
756            Err(err) => err,
757        }
758    }
759
760    pub fn bootstrap_plan(
761        &self,
762        stream_id: &BucketStreamId,
763    ) -> Result<StreamBootstrapPlan, StreamResponse> {
764        let Some(stream) = self.streams.get(stream_id) else {
765            return Err(StreamResponse::error(
766                StreamErrorCode::StreamNotFound,
767                format!("stream '{stream_id}' does not exist"),
768            ));
769        };
770        if is_soft_deleted(stream) {
771            return Err(StreamResponse::error(
772                StreamErrorCode::StreamGone,
773                format!("stream '{stream_id}' is gone"),
774            ));
775        }
776        let snapshot = self.visible_snapshots.get(stream_id).cloned();
777        let retained_offset = snapshot
778            .as_ref()
779            .map(|snapshot| snapshot.offset)
780            .unwrap_or(0);
781        let updates = self
782            .message_records
783            .get(stream_id)
784            .map(|records| {
785                records
786                    .iter()
787                    .filter(|record| record.start_offset >= retained_offset)
788                    .cloned()
789                    .collect::<Vec<_>>()
790            })
791            .unwrap_or_default();
792        Ok(StreamBootstrapPlan {
793            snapshot,
794            updates,
795            next_offset: stream.tail_offset,
796            content_type: stream.content_type.clone(),
797            up_to_date: true,
798            closed: stream.status == StreamStatus::Closed,
799        })
800    }
801
802    fn publish_snapshot(
803        &mut self,
804        stream_id: BucketStreamId,
805        snapshot_offset: u64,
806        content_type: String,
807        payload: Vec<u8>,
808        now_ms: u64,
809    ) -> StreamResponse {
810        if let Err(response) = self.validate_stream_scope(&stream_id) {
811            return response;
812        }
813        if content_type.trim().is_empty() {
814            return StreamResponse::error(
815                StreamErrorCode::InvalidSnapshot,
816                "snapshot content type must not be empty",
817            );
818        }
819        let Some(stream) = self.streams.get(&stream_id) else {
820            return StreamResponse::error(
821                StreamErrorCode::StreamNotFound,
822                format!("stream '{stream_id}' does not exist"),
823            );
824        };
825        if is_soft_deleted(stream) {
826            return StreamResponse::error(
827                StreamErrorCode::StreamGone,
828                format!("stream '{stream_id}' is gone"),
829            );
830        }
831        if stream_is_expired(stream, now_ms) {
832            self.remove_stream_state(&stream_id);
833            return StreamResponse::error(
834                StreamErrorCode::StreamNotFound,
835                format!("stream '{stream_id}' does not exist"),
836            );
837        }
838        let tail_offset = stream.tail_offset;
839        let retained_offset = self.earliest_retained_offset(&stream_id);
840        if snapshot_offset < retained_offset {
841            return StreamResponse::error_with_next_offset(
842                StreamErrorCode::StreamGone,
843                format!(
844                    "snapshot offset {snapshot_offset} is older than stream '{}' retained offset {retained_offset}",
845                    stream_id
846                ),
847                retained_offset,
848            );
849        }
850        if snapshot_offset > tail_offset {
851            return StreamResponse::error_with_next_offset(
852                StreamErrorCode::SnapshotConflict,
853                format!(
854                    "snapshot offset {snapshot_offset} is beyond stream '{}' tail {tail_offset}",
855                    stream_id
856                ),
857                tail_offset,
858            );
859        }
860        if !self.snapshot_offset_aligned(&stream_id, snapshot_offset, retained_offset) {
861            return StreamResponse::error_with_next_offset(
862                StreamErrorCode::InvalidSnapshot,
863                format!(
864                    "snapshot offset {snapshot_offset} is not aligned to a committed message boundary for stream '{stream_id}'"
865                ),
866                tail_offset,
867            );
868        }
869
870        self.visible_snapshots.insert(
871            stream_id.clone(),
872            StreamVisibleSnapshot {
873                offset: snapshot_offset,
874                content_type,
875                payload,
876            },
877        );
878        self.compact_retained_prefix(&stream_id, snapshot_offset);
879        StreamResponse::SnapshotPublished { snapshot_offset }
880    }
881
882    fn flush_cold(&mut self, stream_id: BucketStreamId, chunk: ColdChunkRef) -> StreamResponse {
883        if let Err(response) = self.validate_stream_scope(&stream_id) {
884            return response;
885        }
886        if chunk.s3_path.trim().is_empty() {
887            return StreamResponse::error(
888                StreamErrorCode::InvalidColdFlush,
889                "cold chunk S3 path must not be empty",
890            );
891        }
892        if chunk.object_size == 0 {
893            return StreamResponse::error(
894                StreamErrorCode::InvalidColdFlush,
895                "cold chunk object size must be greater than zero",
896            );
897        }
898        let Some(stream) = self.streams.get(&stream_id) else {
899            return StreamResponse::error(
900                StreamErrorCode::StreamNotFound,
901                format!("stream '{stream_id}' does not exist"),
902            );
903        };
904        if is_soft_deleted(stream) {
905            return StreamResponse::error(
906                StreamErrorCode::StreamGone,
907                format!("stream '{stream_id}' is gone"),
908            );
909        }
910        if chunk.end_offset <= chunk.start_offset {
911            return StreamResponse::error_with_next_offset(
912                StreamErrorCode::InvalidColdFlush,
913                "cold chunk must cover at least one byte",
914                stream.tail_offset,
915            );
916        }
917        if chunk.end_offset > stream.tail_offset {
918            return StreamResponse::error_with_next_offset(
919                StreamErrorCode::InvalidColdFlush,
920                format!(
921                    "cold chunk end {} is beyond stream '{}' tail {}",
922                    chunk.end_offset, stream_id, stream.tail_offset
923                ),
924                stream.tail_offset,
925            );
926        }
927        let segments = self.hot_segments(&stream_id);
928        let Some(segment_index) = segments
929            .iter()
930            .position(|segment| segment.start_offset == chunk.start_offset)
931        else {
932            return StreamResponse::error_with_next_offset(
933                StreamErrorCode::InvalidColdFlush,
934                format!(
935                    "cold chunk for stream '{stream_id}' does not match the start of a hot payload segment"
936                ),
937                stream.tail_offset,
938            );
939        };
940
941        let drain_start = segments[segment_index].payload_start;
942        let mut covered_offset = chunk.start_offset;
943        let mut flush_len = 0usize;
944        for segment in segments.iter().skip(segment_index) {
945            if segment.start_offset != covered_offset {
946                break;
947            }
948            let segment_cover_end = chunk.end_offset.min(segment.end_offset);
949            let segment_flush_len = match usize::try_from(segment_cover_end - segment.start_offset)
950            {
951                Ok(segment_flush_len) => segment_flush_len,
952                Err(_) => {
953                    return StreamResponse::error_with_next_offset(
954                        StreamErrorCode::InvalidColdFlush,
955                        "cold chunk length does not fit in memory",
956                        stream.tail_offset,
957                    );
958                }
959            };
960            let Some(expected_payload_start) = drain_start.checked_add(flush_len) else {
961                return StreamResponse::error_with_next_offset(
962                    StreamErrorCode::InvalidColdFlush,
963                    "cold chunk length does not fit in memory",
964                    stream.tail_offset,
965                );
966            };
967            if segment.payload_start != expected_payload_start {
968                return StreamResponse::error_with_next_offset(
969                    StreamErrorCode::InvalidColdFlush,
970                    format!("stream '{stream_id}' has non-contiguous hot payload metadata"),
971                    stream.tail_offset,
972                );
973            }
974            let segment_payload_len = segment.payload_end - segment.payload_start;
975            if segment_flush_len > segment_payload_len {
976                return StreamResponse::error_with_next_offset(
977                    StreamErrorCode::InvalidColdFlush,
978                    format!("cold chunk length exceeds stream '{stream_id}' hot segment metadata"),
979                    stream.tail_offset,
980                );
981            }
982            let Some(new_flush_len) = flush_len.checked_add(segment_flush_len) else {
983                return StreamResponse::error_with_next_offset(
984                    StreamErrorCode::InvalidColdFlush,
985                    "cold chunk length does not fit in memory",
986                    stream.tail_offset,
987                );
988            };
989            flush_len = new_flush_len;
990            covered_offset = segment_cover_end;
991            if covered_offset == chunk.end_offset {
992                break;
993            }
994        }
995        if covered_offset != chunk.end_offset {
996            return StreamResponse::error_with_next_offset(
997                StreamErrorCode::InvalidColdFlush,
998                format!(
999                    "cold chunk for stream '{stream_id}' does not cover contiguous hot payload segments"
1000                ),
1001                stream.tail_offset,
1002            );
1003        }
1004        let Some(drain_end) = drain_start.checked_add(flush_len) else {
1005            return StreamResponse::error_with_next_offset(
1006                StreamErrorCode::InvalidColdFlush,
1007                "cold chunk length does not fit in memory",
1008                stream.tail_offset,
1009            );
1010        };
1011        let payload_len = self
1012            .payloads
1013            .get(&stream_id)
1014            .expect("payload vector exists for stream metadata")
1015            .len();
1016        if drain_end > payload_len {
1017            return StreamResponse::error_with_next_offset(
1018                StreamErrorCode::InvalidColdFlush,
1019                format!("cold chunk length exceeds stream '{stream_id}' hot payload length"),
1020                stream.tail_offset,
1021            );
1022        };
1023
1024        self.payloads
1025            .get_mut(&stream_id)
1026            .expect("payload vector exists for stream metadata")
1027            .drain(drain_start..drain_end);
1028        self.remove_drained_hot_range(
1029            &stream_id,
1030            segment_index,
1031            chunk.end_offset,
1032            drain_start,
1033            flush_len,
1034        );
1035        self.cold_chunks
1036            .entry(stream_id.clone())
1037            .or_default()
1038            .push(chunk.clone());
1039        self.refresh_hot_start_offset(&stream_id);
1040        StreamResponse::ColdFlushed {
1041            hot_start_offset: self.hot_start_offset(&stream_id),
1042        }
1043    }
1044
1045    fn create_bucket(&mut self, bucket_id: String) -> StreamResponse {
1046        if let Err(message) = validate_bucket_id(&bucket_id) {
1047            return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1048        }
1049        if !self.buckets.insert(bucket_id.clone()) {
1050            return StreamResponse::BucketAlreadyExists { bucket_id };
1051        }
1052        StreamResponse::BucketCreated { bucket_id }
1053    }
1054
1055    fn delete_bucket(&mut self, bucket_id: &str) -> StreamResponse {
1056        if let Err(message) = validate_bucket_id(bucket_id) {
1057            return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1058        }
1059        if !self.buckets.contains(bucket_id) {
1060            return StreamResponse::error(
1061                StreamErrorCode::BucketNotFound,
1062                format!("bucket '{bucket_id}' does not exist"),
1063            );
1064        }
1065        if self
1066            .streams
1067            .keys()
1068            .any(|stream_id| stream_id.bucket_id == bucket_id)
1069        {
1070            return StreamResponse::error(
1071                StreamErrorCode::BucketNotEmpty,
1072                format!("bucket '{bucket_id}' is not empty"),
1073            );
1074        }
1075        self.buckets.remove(bucket_id);
1076        StreamResponse::BucketDeleted {
1077            bucket_id: bucket_id.to_owned(),
1078        }
1079    }
1080
1081    fn create_stream(&mut self, input: CreateStreamInput) -> StreamResponse {
1082        if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1083            return response;
1084        }
1085        if let Err(response) =
1086            validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1087        {
1088            return response;
1089        }
1090        if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1091            return response;
1092        }
1093        if let Some(producer) = input.producer.as_ref()
1094            && producer.producer_seq != 0
1095        {
1096            return StreamResponse::error(
1097                StreamErrorCode::ProducerSeqConflict,
1098                format!(
1099                    "producer '{}' expected sequence 0, received {}",
1100                    producer.producer_id, producer.producer_seq
1101                ),
1102            );
1103        }
1104        if self
1105            .streams
1106            .get(&input.stream_id)
1107            .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1108        {
1109            self.remove_stream_state(&input.stream_id);
1110        }
1111
1112        if let Some(existing) = self.streams.get(&input.stream_id) {
1113            if is_soft_deleted(existing) {
1114                return StreamResponse::error(
1115                    StreamErrorCode::StreamAlreadyExistsConflict,
1116                    format!(
1117                        "stream '{}' is gone and cannot be recreated yet",
1118                        input.stream_id
1119                    ),
1120                );
1121            }
1122            if existing.content_type == input.content_type
1123                && existing.status == status_from_closed(input.close_after)
1124                && existing.stream_ttl_seconds == input.stream_ttl_seconds
1125                && existing.stream_expires_at_ms == input.stream_expires_at_ms
1126                && existing.forked_from == input.forked_from
1127                && existing.fork_offset == input.fork_offset
1128            {
1129                return StreamResponse::AlreadyExists {
1130                    next_offset: existing.tail_offset,
1131                    closed: existing.status == StreamStatus::Closed,
1132                    content_type: existing.content_type.clone(),
1133                    stream_ttl_seconds: existing.stream_ttl_seconds,
1134                    stream_expires_at_ms: existing.stream_expires_at_ms,
1135                };
1136            }
1137            return StreamResponse::error(
1138                StreamErrorCode::StreamAlreadyExistsConflict,
1139                format!(
1140                    "stream '{}' already exists with different metadata",
1141                    input.stream_id
1142                ),
1143            );
1144        }
1145
1146        let initial_len = input.initial_len();
1147        let metadata = StreamMetadata {
1148            stream_id: input.stream_id.clone(),
1149            content_type: input.content_type,
1150            status: status_from_closed(input.close_after),
1151            tail_offset: initial_len,
1152            last_stream_seq: input.stream_seq,
1153            stream_ttl_seconds: input.stream_ttl_seconds,
1154            stream_expires_at_ms: input.stream_expires_at_ms,
1155            created_at_ms: input.now_ms,
1156            last_ttl_touch_at_ms: input.now_ms,
1157            forked_from: input.forked_from,
1158            fork_offset: input.fork_offset,
1159            fork_ref_count: 0,
1160        };
1161        self.streams.insert(input.stream_id.clone(), metadata);
1162        self.payloads
1163            .insert(input.stream_id.clone(), input.initial_payload);
1164        if initial_len > 0 {
1165            self.hot_segments.insert(
1166                input.stream_id.clone(),
1167                vec![HotPayloadSegment {
1168                    start_offset: 0,
1169                    end_offset: initial_len,
1170                    payload_start: 0,
1171                    payload_end: usize::try_from(initial_len).expect("payload len fits usize"),
1172                }],
1173            );
1174            self.message_records.insert(
1175                input.stream_id.clone(),
1176                vec![StreamMessageRecord {
1177                    start_offset: 0,
1178                    end_offset: initial_len,
1179                }],
1180            );
1181        }
1182        let mut producer_states = HashMap::new();
1183        if let Some(producer) = input.producer {
1184            let last_item = ProducerAppendRecord {
1185                start_offset: 0,
1186                next_offset: initial_len,
1187                closed: input.close_after,
1188            };
1189            producer_states.insert(
1190                producer.producer_id,
1191                ProducerState {
1192                    producer_epoch: producer.producer_epoch,
1193                    producer_seq: producer.producer_seq,
1194                    last_start_offset: last_item.start_offset,
1195                    last_next_offset: last_item.next_offset,
1196                    last_closed: last_item.closed,
1197                    last_items: vec![last_item],
1198                },
1199            );
1200        }
1201        self.producers
1202            .insert(input.stream_id.clone(), producer_states);
1203        StreamResponse::Created {
1204            stream_id: input.stream_id,
1205            next_offset: initial_len,
1206            closed: input.close_after,
1207        }
1208    }
1209
1210    fn create_external_stream(&mut self, input: CreateExternalStreamInput) -> StreamResponse {
1211        if let Err(response) = validate_external_payload_ref(&input.initial_payload) {
1212            return response;
1213        }
1214        if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1215            return response;
1216        }
1217        if let Err(response) =
1218            validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1219        {
1220            return response;
1221        }
1222        if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1223            return response;
1224        }
1225        if let Some(producer) = input.producer.as_ref()
1226            && producer.producer_seq != 0
1227        {
1228            return StreamResponse::error(
1229                StreamErrorCode::ProducerSeqConflict,
1230                format!(
1231                    "producer '{}' expected sequence 0, received {}",
1232                    producer.producer_id, producer.producer_seq
1233                ),
1234            );
1235        }
1236        if self
1237            .streams
1238            .get(&input.stream_id)
1239            .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1240        {
1241            self.remove_stream_state(&input.stream_id);
1242        }
1243
1244        if let Some(existing) = self.streams.get(&input.stream_id) {
1245            if is_soft_deleted(existing) {
1246                return StreamResponse::error(
1247                    StreamErrorCode::StreamAlreadyExistsConflict,
1248                    format!(
1249                        "stream '{}' is gone and cannot be recreated yet",
1250                        input.stream_id
1251                    ),
1252                );
1253            }
1254            if existing.content_type == input.content_type
1255                && existing.status == status_from_closed(input.close_after)
1256                && existing.stream_ttl_seconds == input.stream_ttl_seconds
1257                && existing.stream_expires_at_ms == input.stream_expires_at_ms
1258                && existing.forked_from == input.forked_from
1259                && existing.fork_offset == input.fork_offset
1260            {
1261                return StreamResponse::AlreadyExists {
1262                    next_offset: existing.tail_offset,
1263                    closed: existing.status == StreamStatus::Closed,
1264                    content_type: existing.content_type.clone(),
1265                    stream_ttl_seconds: existing.stream_ttl_seconds,
1266                    stream_expires_at_ms: existing.stream_expires_at_ms,
1267                };
1268            }
1269            return StreamResponse::error(
1270                StreamErrorCode::StreamAlreadyExistsConflict,
1271                format!(
1272                    "stream '{}' already exists with different metadata",
1273                    input.stream_id
1274                ),
1275            );
1276        }
1277
1278        let initial_len = input.initial_payload.payload_len;
1279        let metadata = StreamMetadata {
1280            stream_id: input.stream_id.clone(),
1281            content_type: input.content_type,
1282            status: status_from_closed(input.close_after),
1283            tail_offset: initial_len,
1284            last_stream_seq: input.stream_seq,
1285            stream_ttl_seconds: input.stream_ttl_seconds,
1286            stream_expires_at_ms: input.stream_expires_at_ms,
1287            created_at_ms: input.now_ms,
1288            last_ttl_touch_at_ms: input.now_ms,
1289            forked_from: input.forked_from,
1290            fork_offset: input.fork_offset,
1291            fork_ref_count: 0,
1292        };
1293        self.streams.insert(input.stream_id.clone(), metadata);
1294        self.payloads.insert(input.stream_id.clone(), Vec::new());
1295        self.external_segments.insert(
1296            input.stream_id.clone(),
1297            vec![ObjectPayloadRef {
1298                start_offset: 0,
1299                end_offset: initial_len,
1300                s3_path: input.initial_payload.s3_path,
1301                object_size: input.initial_payload.object_size,
1302            }],
1303        );
1304        self.message_records.insert(
1305            input.stream_id.clone(),
1306            vec![StreamMessageRecord {
1307                start_offset: 0,
1308                end_offset: initial_len,
1309            }],
1310        );
1311        let mut producer_states = HashMap::new();
1312        if let Some(producer) = input.producer {
1313            let last_item = ProducerAppendRecord {
1314                start_offset: 0,
1315                next_offset: initial_len,
1316                closed: input.close_after,
1317            };
1318            producer_states.insert(
1319                producer.producer_id,
1320                ProducerState {
1321                    producer_epoch: producer.producer_epoch,
1322                    producer_seq: producer.producer_seq,
1323                    last_start_offset: last_item.start_offset,
1324                    last_next_offset: last_item.next_offset,
1325                    last_closed: last_item.closed,
1326                    last_items: vec![last_item],
1327                },
1328            );
1329        }
1330        self.producers
1331            .insert(input.stream_id.clone(), producer_states);
1332        StreamResponse::Created {
1333            stream_id: input.stream_id,
1334            next_offset: initial_len,
1335            closed: input.close_after,
1336        }
1337    }
1338
1339    pub fn append_borrowed(&mut self, input: AppendStreamInput<'_>) -> StreamResponse {
1340        let AppendStreamInput {
1341            stream_id,
1342            content_type,
1343            payload,
1344            close_after,
1345            stream_seq,
1346            producer,
1347            now_ms,
1348        } = input;
1349        if let Err(response) = self.validate_stream_scope(&stream_id) {
1350            return response;
1351        }
1352        if let Err(response) = validate_producer_request(producer.as_ref()) {
1353            return response;
1354        }
1355
1356        let Some(_) = self.streams.get(&stream_id) else {
1357            return StreamResponse::error(
1358                StreamErrorCode::StreamNotFound,
1359                format!("stream '{stream_id}' does not exist"),
1360            );
1361        };
1362        if self.expire_stream_if_due(&stream_id, now_ms) {
1363            return StreamResponse::error(
1364                StreamErrorCode::StreamNotFound,
1365                format!("stream '{stream_id}' does not exist"),
1366            );
1367        }
1368        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1369            return StreamResponse::error(
1370                StreamErrorCode::StreamGone,
1371                format!("stream '{stream_id}' is gone"),
1372            );
1373        }
1374        let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1375            Ok(decision) => decision,
1376            Err(response) => return response,
1377        };
1378        if let ProducerDecision::Duplicate {
1379            offset,
1380            next_offset,
1381            closed,
1382            producer,
1383            ..
1384        } = producer_decision
1385        {
1386            if payload.is_empty() {
1387                return StreamResponse::Closed {
1388                    next_offset,
1389                    deduplicated: true,
1390                    producer: Some(producer),
1391                };
1392            }
1393            return StreamResponse::Appended {
1394                offset,
1395                next_offset,
1396                closed,
1397                deduplicated: true,
1398                producer: Some(producer),
1399            };
1400        }
1401
1402        let Some(stream) = self.streams.get_mut(&stream_id) else {
1403            unreachable!("stream existence checked before producer evaluation");
1404        };
1405
1406        if stream.status == StreamStatus::Closed {
1407            if close_after && payload.is_empty() {
1408                return StreamResponse::Closed {
1409                    next_offset: stream.tail_offset,
1410                    deduplicated: false,
1411                    producer: None,
1412                };
1413            }
1414            return StreamResponse::error_with_next_offset(
1415                StreamErrorCode::StreamClosed,
1416                format!("stream '{stream_id}' is closed"),
1417                stream.tail_offset,
1418            );
1419        }
1420
1421        if payload.is_empty() && !close_after {
1422            return StreamResponse::error(
1423                StreamErrorCode::EmptyAppend,
1424                "append payload must be non-empty unless closing the stream",
1425            );
1426        }
1427
1428        if !payload.is_empty() {
1429            let Some(content_type) = content_type else {
1430                return StreamResponse::error(
1431                    StreamErrorCode::MissingContentType,
1432                    "append with a body must include content type",
1433                );
1434            };
1435            if content_type != stream.content_type {
1436                return StreamResponse::error_with_next_offset(
1437                    StreamErrorCode::ContentTypeMismatch,
1438                    format!(
1439                        "append content type '{content_type}' does not match stream content type '{}'",
1440                        stream.content_type
1441                    ),
1442                    stream.tail_offset,
1443                );
1444            }
1445        }
1446
1447        if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1448            return response;
1449        }
1450
1451        let offset = stream.tail_offset;
1452        let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1453        stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1454        if let Some(seq) = stream_seq {
1455            stream.last_stream_seq = Some(seq);
1456        }
1457        renew_stream_ttl(stream, now_ms);
1458        if close_after {
1459            stream.status = StreamStatus::Closed;
1460        }
1461        let closed = stream.status == StreamStatus::Closed;
1462        let next_offset = stream.tail_offset;
1463        let producer_ack = producer.clone();
1464        if let Some(producer) = producer {
1465            self.record_producer_success(
1466                stream_id.clone(),
1467                producer,
1468                ProducerAppendRecord {
1469                    start_offset: offset,
1470                    next_offset,
1471                    closed,
1472                },
1473                vec![ProducerAppendRecord {
1474                    start_offset: offset,
1475                    next_offset,
1476                    closed,
1477                }],
1478            );
1479        }
1480
1481        if payload.is_empty() {
1482            StreamResponse::Closed {
1483                next_offset,
1484                deduplicated: false,
1485                producer: producer_ack,
1486            }
1487        } else {
1488            let payload_store = self
1489                .payloads
1490                .get_mut(&stream_id)
1491                .expect("payload vector exists for stream metadata");
1492            let payload_start = payload_store.len();
1493            payload_store.extend_from_slice(payload);
1494            let payload_end = payload_store.len();
1495            self.hot_segments
1496                .get_mut(&stream_id)
1497                .map(|segments| {
1498                    segments.push(HotPayloadSegment {
1499                        start_offset: offset,
1500                        end_offset: next_offset,
1501                        payload_start,
1502                        payload_end,
1503                    })
1504                })
1505                .unwrap_or_else(|| {
1506                    self.hot_segments.insert(
1507                        stream_id.clone(),
1508                        vec![HotPayloadSegment {
1509                            start_offset: offset,
1510                            end_offset: next_offset,
1511                            payload_start,
1512                            payload_end,
1513                        }],
1514                    );
1515                });
1516            self.refresh_hot_start_offset(&stream_id);
1517            self.message_records
1518                .entry(stream_id.clone())
1519                .or_default()
1520                .push(StreamMessageRecord {
1521                    start_offset: offset,
1522                    end_offset: next_offset,
1523                });
1524            StreamResponse::Appended {
1525                offset,
1526                next_offset,
1527                closed: close_after,
1528                deduplicated: false,
1529                producer: producer_ack,
1530            }
1531        }
1532    }
1533
1534    fn append_external(&mut self, input: AppendExternalInput<'_>) -> StreamResponse {
1535        let AppendExternalInput {
1536            stream_id,
1537            content_type,
1538            payload,
1539            close_after,
1540            stream_seq,
1541            producer,
1542            now_ms,
1543        } = input;
1544        if let Err(response) = validate_external_payload_ref(&payload) {
1545            return response;
1546        }
1547        if let Err(response) = self.validate_stream_scope(&stream_id) {
1548            return response;
1549        }
1550        if let Err(response) = validate_producer_request(producer.as_ref()) {
1551            return response;
1552        }
1553        let Some(_) = self.streams.get(&stream_id) else {
1554            return StreamResponse::error(
1555                StreamErrorCode::StreamNotFound,
1556                format!("stream '{stream_id}' does not exist"),
1557            );
1558        };
1559        if self.expire_stream_if_due(&stream_id, now_ms) {
1560            return StreamResponse::error(
1561                StreamErrorCode::StreamNotFound,
1562                format!("stream '{stream_id}' does not exist"),
1563            );
1564        }
1565        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1566            return StreamResponse::error(
1567                StreamErrorCode::StreamGone,
1568                format!("stream '{stream_id}' is gone"),
1569            );
1570        }
1571        let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1572            Ok(decision) => decision,
1573            Err(response) => return response,
1574        };
1575        if let ProducerDecision::Duplicate {
1576            offset,
1577            next_offset,
1578            closed,
1579            producer,
1580            ..
1581        } = producer_decision
1582        {
1583            return StreamResponse::Appended {
1584                offset,
1585                next_offset,
1586                closed,
1587                deduplicated: true,
1588                producer: Some(producer),
1589            };
1590        }
1591
1592        let Some(stream) = self.streams.get(&stream_id) else {
1593            unreachable!("stream existence checked before producer evaluation");
1594        };
1595        if stream.status == StreamStatus::Closed {
1596            return StreamResponse::error_with_next_offset(
1597                StreamErrorCode::StreamClosed,
1598                format!("stream '{stream_id}' is closed"),
1599                stream.tail_offset,
1600            );
1601        }
1602        let Some(content_type) = content_type else {
1603            return StreamResponse::error(
1604                StreamErrorCode::MissingContentType,
1605                "append with a body must include content type",
1606            );
1607        };
1608        if content_type != stream.content_type {
1609            return StreamResponse::error_with_next_offset(
1610                StreamErrorCode::ContentTypeMismatch,
1611                format!(
1612                    "append content type '{content_type}' does not match stream content type '{}'",
1613                    stream.content_type
1614                ),
1615                stream.tail_offset,
1616            );
1617        }
1618        if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1619            return response;
1620        }
1621        let offset = stream.tail_offset;
1622        let next_offset = offset.saturating_add(payload.payload_len);
1623        let stream = self
1624            .streams
1625            .get_mut(&stream_id)
1626            .expect("stream existence checked before external append mutation");
1627        stream.tail_offset = next_offset;
1628        if let Some(seq) = stream_seq {
1629            stream.last_stream_seq = Some(seq);
1630        }
1631        renew_stream_ttl(stream, now_ms);
1632        if close_after {
1633            stream.status = StreamStatus::Closed;
1634        }
1635        let closed = stream.status == StreamStatus::Closed;
1636        let producer_ack = producer.clone();
1637        if let Some(producer) = producer {
1638            self.record_producer_success(
1639                stream_id.clone(),
1640                producer,
1641                ProducerAppendRecord {
1642                    start_offset: offset,
1643                    next_offset,
1644                    closed,
1645                },
1646                vec![ProducerAppendRecord {
1647                    start_offset: offset,
1648                    next_offset,
1649                    closed,
1650                }],
1651            );
1652        }
1653        self.external_segments
1654            .entry(stream_id.clone())
1655            .or_default()
1656            .push(ObjectPayloadRef {
1657                start_offset: offset,
1658                end_offset: next_offset,
1659                s3_path: payload.s3_path,
1660                object_size: payload.object_size,
1661            });
1662        self.message_records
1663            .entry(stream_id.clone())
1664            .or_default()
1665            .push(StreamMessageRecord {
1666                start_offset: offset,
1667                end_offset: next_offset,
1668            });
1669        StreamResponse::Appended {
1670            offset,
1671            next_offset,
1672            closed: close_after,
1673            deduplicated: false,
1674            producer: producer_ack,
1675        }
1676    }
1677
1678    pub fn append_batch_borrowed(
1679        &mut self,
1680        stream_id: BucketStreamId,
1681        content_type: Option<&str>,
1682        payloads: &[&[u8]],
1683        producer: Option<ProducerRequest>,
1684        now_ms: u64,
1685    ) -> Result<StreamBatchAppend, StreamResponse> {
1686        if payloads.is_empty() {
1687            return Err(StreamResponse::error(
1688                StreamErrorCode::EmptyAppend,
1689                "append batch must contain at least one payload",
1690            ));
1691        }
1692        self.validate_stream_scope(&stream_id)?;
1693        validate_producer_request(producer.as_ref())?;
1694        if self.expire_stream_if_due(&stream_id, now_ms) {
1695            return Err(StreamResponse::error(
1696                StreamErrorCode::StreamNotFound,
1697                format!("stream '{stream_id}' does not exist"),
1698            ));
1699        }
1700        if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1701            return Err(StreamResponse::error(
1702                StreamErrorCode::StreamGone,
1703                format!("stream '{stream_id}' is gone"),
1704            ));
1705        }
1706        let producer_decision = self.evaluate_producer(&stream_id, producer.as_ref())?;
1707        if let ProducerDecision::Duplicate { items, .. } = producer_decision {
1708            return Ok(StreamBatchAppend {
1709                items: items
1710                    .into_iter()
1711                    .map(|item| StreamBatchAppendItem {
1712                        offset: item.start_offset,
1713                        next_offset: item.next_offset,
1714                        closed: item.closed,
1715                        deduplicated: true,
1716                    })
1717                    .collect(),
1718                deduplicated: true,
1719            });
1720        }
1721
1722        let Some(stream) = self.streams.get_mut(&stream_id) else {
1723            return Err(StreamResponse::error(
1724                StreamErrorCode::StreamNotFound,
1725                format!("stream '{stream_id}' does not exist"),
1726            ));
1727        };
1728        if stream.status == StreamStatus::Closed {
1729            return Err(StreamResponse::error_with_next_offset(
1730                StreamErrorCode::StreamClosed,
1731                format!("stream '{stream_id}' is closed"),
1732                stream.tail_offset,
1733            ));
1734        }
1735        let Some(content_type) = content_type else {
1736            return Err(StreamResponse::error(
1737                StreamErrorCode::MissingContentType,
1738                "append batch must include content type",
1739            ));
1740        };
1741        if content_type != stream.content_type {
1742            return Err(StreamResponse::error_with_next_offset(
1743                StreamErrorCode::ContentTypeMismatch,
1744                format!(
1745                    "append content type '{content_type}' does not match stream content type '{}'",
1746                    stream.content_type
1747                ),
1748                stream.tail_offset,
1749            ));
1750        }
1751        if payloads.iter().any(|payload| payload.is_empty()) {
1752            return Err(StreamResponse::error(
1753                StreamErrorCode::EmptyAppend,
1754                "append batch payloads must be non-empty",
1755            ));
1756        }
1757
1758        let mut items = Vec::with_capacity(payloads.len());
1759        for payload in payloads {
1760            let offset = stream.tail_offset;
1761            let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1762            stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1763            items.push(ProducerAppendRecord {
1764                start_offset: offset,
1765                next_offset: stream.tail_offset,
1766                closed: false,
1767            });
1768        }
1769        let last = items
1770            .last()
1771            .expect("payloads checked non-empty before append")
1772            .clone();
1773        renew_stream_ttl(stream, now_ms);
1774        if let Some(producer) = producer {
1775            self.record_producer_success(stream_id.clone(), producer, last.clone(), items.clone());
1776        }
1777        let payload_store = self
1778            .payloads
1779            .get_mut(&stream_id)
1780            .expect("payload vector exists for stream metadata");
1781        let hot_segments = self.hot_segments.entry(stream_id.clone()).or_default();
1782        for (item, payload) in items.iter().zip(payloads.iter()) {
1783            let payload_start = payload_store.len();
1784            payload_store.extend_from_slice(payload);
1785            let payload_end = payload_store.len();
1786            hot_segments.push(HotPayloadSegment {
1787                start_offset: item.start_offset,
1788                end_offset: item.next_offset,
1789                payload_start,
1790                payload_end,
1791            });
1792        }
1793        self.refresh_hot_start_offset(&stream_id);
1794        self.message_records
1795            .entry(stream_id.clone())
1796            .or_default()
1797            .extend(items.iter().map(|item| StreamMessageRecord {
1798                start_offset: item.start_offset,
1799                end_offset: item.next_offset,
1800            }));
1801        Ok(StreamBatchAppend {
1802            items: items
1803                .into_iter()
1804                .map(|item| StreamBatchAppendItem {
1805                    offset: item.start_offset,
1806                    next_offset: item.next_offset,
1807                    closed: item.closed,
1808                    deduplicated: false,
1809                })
1810                .collect(),
1811            deduplicated: false,
1812        })
1813    }
1814
1815    fn close(
1816        &mut self,
1817        stream_id: BucketStreamId,
1818        stream_seq: Option<String>,
1819        producer: Option<ProducerRequest>,
1820        now_ms: u64,
1821    ) -> StreamResponse {
1822        self.append_borrowed(AppendStreamInput {
1823            stream_id,
1824            content_type: None,
1825            payload: &[],
1826            close_after: true,
1827            stream_seq,
1828            producer,
1829            now_ms,
1830        })
1831    }
1832
1833    fn delete_stream(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
1834        if let Err(response) = self.validate_stream_scope(stream_id) {
1835            return response;
1836        }
1837        let Some(stream) = self.streams.get_mut(stream_id) else {
1838            return StreamResponse::error(
1839                StreamErrorCode::StreamNotFound,
1840                format!("stream '{stream_id}' does not exist"),
1841            );
1842        };
1843        if is_soft_deleted(stream) {
1844            return StreamResponse::error(
1845                StreamErrorCode::StreamGone,
1846                format!("stream '{stream_id}' is gone"),
1847            );
1848        }
1849        if stream.fork_ref_count > 0 {
1850            stream.status = StreamStatus::SoftDeleted;
1851            return StreamResponse::Deleted {
1852                hard_deleted: false,
1853                parent_to_release: None,
1854            };
1855        }
1856        let parent_to_release = stream.forked_from.clone();
1857        self.remove_stream_state(stream_id);
1858        StreamResponse::Deleted {
1859            hard_deleted: true,
1860            parent_to_release,
1861        }
1862    }
1863
1864    fn add_fork_ref(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> StreamResponse {
1865        if let Err(response) = self.validate_stream_scope(stream_id) {
1866            return response;
1867        }
1868        if self.expire_stream_if_due(stream_id, now_ms) {
1869            return StreamResponse::error(
1870                StreamErrorCode::StreamNotFound,
1871                format!("stream '{stream_id}' does not exist"),
1872            );
1873        }
1874        let Some(stream) = self.streams.get_mut(stream_id) else {
1875            return StreamResponse::error(
1876                StreamErrorCode::StreamNotFound,
1877                format!("stream '{stream_id}' does not exist"),
1878            );
1879        };
1880        if is_soft_deleted(stream) {
1881            return StreamResponse::error(
1882                StreamErrorCode::StreamGone,
1883                format!("stream '{stream_id}' is gone"),
1884            );
1885        }
1886        stream.fork_ref_count = stream.fork_ref_count.saturating_add(1);
1887        StreamResponse::ForkRefAdded {
1888            fork_ref_count: stream.fork_ref_count,
1889        }
1890    }
1891
1892    fn release_fork_ref(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
1893        if let Err(response) = self.validate_stream_scope(stream_id) {
1894            return response;
1895        }
1896        let Some(stream) = self.streams.get_mut(stream_id) else {
1897            return StreamResponse::ForkRefReleased {
1898                hard_deleted: false,
1899                fork_ref_count: 0,
1900                parent_to_release: None,
1901            };
1902        };
1903        if stream.fork_ref_count == 0 {
1904            return StreamResponse::error(
1905                StreamErrorCode::InvalidFork,
1906                format!("stream '{stream_id}' has no fork reference to release"),
1907            );
1908        }
1909        stream.fork_ref_count -= 1;
1910        if stream.fork_ref_count == 0 && is_soft_deleted(stream) {
1911            let parent_to_release = stream.forked_from.clone();
1912            self.remove_stream_state(stream_id);
1913            return StreamResponse::ForkRefReleased {
1914                hard_deleted: true,
1915                fork_ref_count: 0,
1916                parent_to_release,
1917            };
1918        }
1919        StreamResponse::ForkRefReleased {
1920            hard_deleted: false,
1921            fork_ref_count: stream.fork_ref_count,
1922            parent_to_release: None,
1923        }
1924    }
1925
1926    fn touch_stream_access(
1927        &mut self,
1928        stream_id: &BucketStreamId,
1929        now_ms: u64,
1930        renew_ttl: bool,
1931    ) -> StreamResponse {
1932        if let Err(response) = self.validate_stream_scope(stream_id) {
1933            return response;
1934        }
1935        let Some(stream) = self.streams.get(stream_id) else {
1936            return StreamResponse::error(
1937                StreamErrorCode::StreamNotFound,
1938                format!("stream '{stream_id}' does not exist"),
1939            );
1940        };
1941        if is_soft_deleted(stream) {
1942            return StreamResponse::error(
1943                StreamErrorCode::StreamGone,
1944                format!("stream '{stream_id}' is gone"),
1945            );
1946        }
1947        if stream_is_expired(stream, now_ms) {
1948            self.remove_stream_state(stream_id);
1949            return StreamResponse::Accessed {
1950                changed: true,
1951                expired: true,
1952            };
1953        }
1954        let changed = if renew_ttl && stream.stream_ttl_seconds.is_some() {
1955            let stream = self
1956                .streams
1957                .get_mut(stream_id)
1958                .expect("stream existence checked before TTL renewal");
1959            let previous = stream.last_ttl_touch_at_ms;
1960            renew_stream_ttl(stream, now_ms);
1961            stream.last_ttl_touch_at_ms != previous
1962        } else {
1963            false
1964        };
1965        StreamResponse::Accessed {
1966            changed,
1967            expired: false,
1968        }
1969    }
1970
1971    fn expire_stream_if_due(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> bool {
1972        if self
1973            .streams
1974            .get(stream_id)
1975            .is_some_and(|stream| stream_is_expired(stream, now_ms))
1976        {
1977            self.remove_stream_state(stream_id);
1978            return true;
1979        }
1980        false
1981    }
1982
1983    fn remove_stream_state(&mut self, stream_id: &BucketStreamId) -> bool {
1984        if self.streams.remove(stream_id).is_some() {
1985            self.payloads.remove(stream_id);
1986            self.hot_segments.remove(stream_id);
1987            self.hot_start_offsets.remove(stream_id);
1988            self.cold_chunks.remove(stream_id);
1989            self.external_segments.remove(stream_id);
1990            self.message_records.remove(stream_id);
1991            self.visible_snapshots.remove(stream_id);
1992            self.producers.remove(stream_id);
1993            true
1994        } else {
1995            false
1996        }
1997    }
1998
1999    fn validate_stream_scope(&self, stream_id: &BucketStreamId) -> Result<(), StreamResponse> {
2000        if let Err(message) = validate_bucket_id(&stream_id.bucket_id) {
2001            return Err(StreamResponse::error(
2002                StreamErrorCode::InvalidBucketId,
2003                message,
2004            ));
2005        }
2006        if let Err(message) = validate_stream_id(stream_id) {
2007            return Err(StreamResponse::error(
2008                StreamErrorCode::InvalidStreamId,
2009                message,
2010            ));
2011        }
2012        if !self.buckets.contains(&stream_id.bucket_id) {
2013            return Err(StreamResponse::error(
2014                StreamErrorCode::BucketNotFound,
2015                format!("bucket '{}' does not exist", stream_id.bucket_id),
2016            ));
2017        }
2018        Ok(())
2019    }
2020
2021    fn earliest_retained_offset(&self, stream_id: &BucketStreamId) -> u64 {
2022        self.visible_snapshots
2023            .get(stream_id)
2024            .map(|snapshot| snapshot.offset)
2025            .unwrap_or(0)
2026    }
2027
2028    fn snapshot_offset_aligned(
2029        &self,
2030        stream_id: &BucketStreamId,
2031        snapshot_offset: u64,
2032        retained_offset: u64,
2033    ) -> bool {
2034        snapshot_offset == retained_offset
2035            || self.message_records.get(stream_id).is_some_and(|records| {
2036                records
2037                    .iter()
2038                    .any(|record| record.end_offset == snapshot_offset)
2039            })
2040    }
2041
2042    fn compact_retained_prefix(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2043        if let Some(records) = self.message_records.get_mut(stream_id) {
2044            records.retain(|record| record.end_offset > retained_offset);
2045            if records.is_empty() {
2046                self.message_records.remove(stream_id);
2047            }
2048        }
2049        if let Some(chunks) = self.cold_chunks.get_mut(stream_id) {
2050            chunks.retain(|chunk| chunk.end_offset > retained_offset);
2051            if chunks.is_empty() {
2052                self.cold_chunks.remove(stream_id);
2053            }
2054        }
2055        if let Some(objects) = self.external_segments.get_mut(stream_id) {
2056            objects.retain(|object| object.end_offset > retained_offset);
2057            if objects.is_empty() {
2058                self.external_segments.remove(stream_id);
2059            }
2060        }
2061
2062        self.discard_hot_prefix_before(stream_id, retained_offset);
2063    }
2064
2065    fn refresh_hot_start_offset(&mut self, stream_id: &BucketStreamId) {
2066        let Some(hot_start_offset) = self
2067            .hot_segments
2068            .get(stream_id)
2069            .and_then(|segments| segments.iter().map(|segment| segment.start_offset).min())
2070        else {
2071            self.hot_start_offsets.remove(stream_id);
2072            return;
2073        };
2074        if hot_start_offset == 0 {
2075            self.hot_start_offsets.remove(stream_id);
2076        } else {
2077            self.hot_start_offsets
2078                .insert(stream_id.clone(), hot_start_offset);
2079        }
2080    }
2081
2082    fn remove_drained_hot_range(
2083        &mut self,
2084        stream_id: &BucketStreamId,
2085        segment_index: usize,
2086        new_start_offset: u64,
2087        drain_start: usize,
2088        drained_len: usize,
2089    ) {
2090        let Some(segments) = self.hot_segments.get_mut(stream_id) else {
2091            self.hot_start_offsets.remove(stream_id);
2092            return;
2093        };
2094        if segment_index >= segments.len() {
2095            self.refresh_hot_start_offset(stream_id);
2096            return;
2097        }
2098        let drain_end = drain_start + drained_len;
2099        let mut updated_segments = Vec::with_capacity(segments.len());
2100        for (index, mut segment) in segments.drain(..).enumerate() {
2101            if index < segment_index || segment.payload_end <= drain_start {
2102                updated_segments.push(segment);
2103                continue;
2104            }
2105            if segment.payload_start >= drain_end {
2106                segment.payload_start -= drained_len;
2107                segment.payload_end -= drained_len;
2108                updated_segments.push(segment);
2109                continue;
2110            }
2111            if segment.payload_end <= drain_end {
2112                continue;
2113            }
2114            segment.start_offset = new_start_offset;
2115            segment.payload_start = drain_start;
2116            segment.payload_end -= drained_len;
2117            updated_segments.push(segment);
2118        }
2119        *segments = updated_segments;
2120        if segments.is_empty() {
2121            self.hot_segments.remove(stream_id);
2122        }
2123        self.refresh_hot_start_offset(stream_id);
2124    }
2125
2126    fn discard_hot_prefix_before(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2127        while let Some(segment_index) = self
2128            .hot_segments(stream_id)
2129            .iter()
2130            .position(|segment| segment.start_offset < retained_offset)
2131        {
2132            let segment = self.hot_segments(stream_id)[segment_index].clone();
2133            let new_start_offset = retained_offset.min(segment.end_offset);
2134            let drained_len = usize::try_from(new_start_offset - segment.start_offset)
2135                .expect("drain len fits usize");
2136            if drained_len == 0 {
2137                break;
2138            }
2139            let drain_start = segment.payload_start;
2140            let drain_end = drain_start + drained_len;
2141            self.payloads
2142                .get_mut(stream_id)
2143                .expect("payload vector exists for stream metadata")
2144                .drain(drain_start..drain_end);
2145            self.remove_drained_hot_range(
2146                stream_id,
2147                segment_index,
2148                new_start_offset,
2149                drain_start,
2150                drained_len,
2151            );
2152        }
2153        self.refresh_hot_start_offset(stream_id);
2154    }
2155
2156    fn producer_snapshot(&self, stream_id: &BucketStreamId) -> Vec<ProducerSnapshot> {
2157        let mut producer_states = self
2158            .producers
2159            .get(stream_id)
2160            .into_iter()
2161            .flat_map(|states| states.iter())
2162            .map(|(producer_id, state)| ProducerSnapshot {
2163                producer_id: producer_id.clone(),
2164                producer_epoch: state.producer_epoch,
2165                producer_seq: state.producer_seq,
2166                last_start_offset: state.last_start_offset,
2167                last_next_offset: state.last_next_offset,
2168                last_closed: state.last_closed,
2169                last_items: state.last_items.clone(),
2170            })
2171            .collect::<Vec<_>>();
2172        producer_states.sort_by(|left, right| left.producer_id.cmp(&right.producer_id));
2173        producer_states
2174    }
2175
2176    fn evaluate_producer(
2177        &self,
2178        stream_id: &BucketStreamId,
2179        producer: Option<&ProducerRequest>,
2180    ) -> Result<ProducerDecision, StreamResponse> {
2181        let Some(producer) = producer else {
2182            return Ok(ProducerDecision::Accept);
2183        };
2184        let Some(states) = self.producers.get(stream_id) else {
2185            return Ok(ProducerDecision::Accept);
2186        };
2187        let Some(state) = states.get(&producer.producer_id) else {
2188            if producer.producer_seq == 0 {
2189                return Ok(ProducerDecision::Accept);
2190            }
2191            return Err(StreamResponse::error(
2192                StreamErrorCode::ProducerSeqConflict,
2193                format!(
2194                    "producer '{}' expected sequence 0, received {}",
2195                    producer.producer_id, producer.producer_seq
2196                ),
2197            ));
2198        };
2199
2200        if producer.producer_epoch < state.producer_epoch {
2201            return Err(StreamResponse::error(
2202                StreamErrorCode::ProducerEpochStale,
2203                format!(
2204                    "producer '{}' epoch {} is stale; current epoch is {}",
2205                    producer.producer_id, producer.producer_epoch, state.producer_epoch
2206                ),
2207            ));
2208        }
2209        if producer.producer_epoch > state.producer_epoch {
2210            if producer.producer_seq == 0 {
2211                return Ok(ProducerDecision::Accept);
2212            }
2213            return Err(StreamResponse::error(
2214                StreamErrorCode::InvalidProducer,
2215                format!(
2216                    "producer '{}' new epoch {} must start at sequence 0",
2217                    producer.producer_id, producer.producer_epoch
2218                ),
2219            ));
2220        }
2221
2222        if producer.producer_seq <= state.producer_seq {
2223            return Ok(ProducerDecision::Duplicate {
2224                offset: state.last_start_offset,
2225                next_offset: state.last_next_offset,
2226                closed: state.last_closed,
2227                producer: ProducerRequest {
2228                    producer_id: producer.producer_id.clone(),
2229                    producer_epoch: state.producer_epoch,
2230                    producer_seq: state.producer_seq,
2231                },
2232                items: state.last_items.clone(),
2233            });
2234        }
2235        if producer.producer_seq == state.producer_seq + 1 {
2236            return Ok(ProducerDecision::Accept);
2237        }
2238        Err(StreamResponse::error(
2239            StreamErrorCode::ProducerSeqConflict,
2240            format!(
2241                "producer '{}' expected sequence {}, received {}",
2242                producer.producer_id,
2243                state.producer_seq + 1,
2244                producer.producer_seq
2245            ),
2246        ))
2247    }
2248
2249    fn record_producer_success(
2250        &mut self,
2251        stream_id: BucketStreamId,
2252        producer: ProducerRequest,
2253        last: ProducerAppendRecord,
2254        last_items: Vec<ProducerAppendRecord>,
2255    ) {
2256        self.producers.entry(stream_id).or_default().insert(
2257            producer.producer_id,
2258            ProducerState {
2259                producer_epoch: producer.producer_epoch,
2260                producer_seq: producer.producer_seq,
2261                last_start_offset: last.start_offset,
2262                last_next_offset: last.next_offset,
2263                last_closed: last.closed,
2264                last_items,
2265            },
2266        );
2267    }
2268}
2269
2270#[derive(Debug)]
2271struct CreateStreamInput {
2272    stream_id: BucketStreamId,
2273    content_type: String,
2274    initial_payload: Vec<u8>,
2275    close_after: bool,
2276    stream_seq: Option<String>,
2277    producer: Option<ProducerRequest>,
2278    stream_ttl_seconds: Option<u64>,
2279    stream_expires_at_ms: Option<u64>,
2280    forked_from: Option<BucketStreamId>,
2281    fork_offset: Option<u64>,
2282    now_ms: u64,
2283}
2284
2285#[derive(Debug)]
2286struct CreateExternalStreamInput {
2287    stream_id: BucketStreamId,
2288    content_type: String,
2289    initial_payload: ExternalPayloadRef,
2290    close_after: bool,
2291    stream_seq: Option<String>,
2292    producer: Option<ProducerRequest>,
2293    stream_ttl_seconds: Option<u64>,
2294    stream_expires_at_ms: Option<u64>,
2295    forked_from: Option<BucketStreamId>,
2296    fork_offset: Option<u64>,
2297    now_ms: u64,
2298}
2299
2300#[derive(Debug, Clone, PartialEq, Eq)]
2301enum ProducerDecision {
2302    Accept,
2303    Duplicate {
2304        offset: u64,
2305        next_offset: u64,
2306        closed: bool,
2307        producer: ProducerRequest,
2308        items: Vec<ProducerAppendRecord>,
2309    },
2310}
2311
2312impl CreateStreamInput {
2313    fn initial_len(&self) -> u64 {
2314        u64::try_from(self.initial_payload.len()).expect("payload len fits u64")
2315    }
2316}
2317
2318fn status_from_closed(closed: bool) -> StreamStatus {
2319    if closed {
2320        StreamStatus::Closed
2321    } else {
2322        StreamStatus::Open
2323    }
2324}
2325
2326fn is_soft_deleted(stream: &StreamMetadata) -> bool {
2327    stream.status == StreamStatus::SoftDeleted
2328}
2329
2330fn validate_retention(
2331    stream_ttl_seconds: Option<u64>,
2332    stream_expires_at_ms: Option<u64>,
2333) -> Result<(), StreamResponse> {
2334    if stream_ttl_seconds.is_some() && stream_expires_at_ms.is_some() {
2335        return Err(StreamResponse::error(
2336            StreamErrorCode::InvalidRetention,
2337            "stream ttl and expires-at cannot both be set",
2338        ));
2339    }
2340    if let Some(ttl_seconds) = stream_ttl_seconds
2341        && ttl_seconds.checked_mul(1000).is_none()
2342    {
2343        return Err(StreamResponse::error(
2344            StreamErrorCode::InvalidRetention,
2345            "stream ttl overflows millisecond range",
2346        ));
2347    }
2348    Ok(())
2349}
2350
2351fn stream_expiry_at_ms(stream: &StreamMetadata) -> Option<u64> {
2352    if let Some(expires_at_ms) = stream.stream_expires_at_ms {
2353        return Some(expires_at_ms);
2354    }
2355    stream.stream_ttl_seconds.map(|ttl_seconds| {
2356        stream
2357            .last_ttl_touch_at_ms
2358            .saturating_add(ttl_seconds.saturating_mul(1000))
2359    })
2360}
2361
2362fn stream_is_expired(stream: &StreamMetadata, now_ms: u64) -> bool {
2363    stream_expiry_at_ms(stream).is_some_and(|expires_at_ms| now_ms >= expires_at_ms)
2364}
2365
2366fn renew_stream_ttl(stream: &mut StreamMetadata, now_ms: u64) {
2367    if stream.stream_ttl_seconds.is_some() && stream.stream_expires_at_ms.is_none() {
2368        stream.last_ttl_touch_at_ms = now_ms;
2369    }
2370}
2371
2372fn check_stream_seq(stream: &StreamMetadata, incoming: Option<&str>) -> Result<(), StreamResponse> {
2373    let Some(incoming) = incoming else {
2374        return Ok(());
2375    };
2376    if let Some(last) = stream.last_stream_seq.as_deref()
2377        && incoming <= last
2378    {
2379        return Err(StreamResponse::error_with_next_offset(
2380            StreamErrorCode::StreamSeqConflict,
2381            format!("stream sequence '{incoming}' is not greater than last sequence '{last}'"),
2382            stream.tail_offset,
2383        ));
2384    }
2385    Ok(())
2386}
2387
2388fn validate_producer_request(producer: Option<&ProducerRequest>) -> Result<(), StreamResponse> {
2389    let Some(producer) = producer else {
2390        return Ok(());
2391    };
2392    if producer.producer_id.trim().is_empty() {
2393        return Err(StreamResponse::error(
2394            StreamErrorCode::InvalidProducer,
2395            "producer id must not be empty",
2396        ));
2397    }
2398    const MAX_JS_SAFE_INTEGER: u64 = 9_007_199_254_740_991;
2399    if producer.producer_epoch > MAX_JS_SAFE_INTEGER {
2400        return Err(StreamResponse::error(
2401            StreamErrorCode::InvalidProducer,
2402            format!(
2403                "producer epoch {} exceeds maximum {}",
2404                producer.producer_epoch, MAX_JS_SAFE_INTEGER
2405            ),
2406        ));
2407    }
2408    if producer.producer_seq > MAX_JS_SAFE_INTEGER {
2409        return Err(StreamResponse::error(
2410            StreamErrorCode::InvalidProducer,
2411            format!(
2412                "producer sequence {} exceeds maximum {}",
2413                producer.producer_seq, MAX_JS_SAFE_INTEGER
2414            ),
2415        ));
2416    }
2417    Ok(())
2418}
2419
2420fn validate_external_payload_ref(payload: &ExternalPayloadRef) -> Result<(), StreamResponse> {
2421    if payload.s3_path.trim().is_empty() {
2422        return Err(StreamResponse::error(
2423            StreamErrorCode::InvalidColdFlush,
2424            "external payload S3 path must not be empty",
2425        ));
2426    }
2427    if payload.payload_len == 0 {
2428        return Err(StreamResponse::error(
2429            StreamErrorCode::EmptyAppend,
2430            "external payload length must be greater than zero",
2431        ));
2432    }
2433    if payload.object_size < payload.payload_len {
2434        return Err(StreamResponse::error(
2435            StreamErrorCode::InvalidColdFlush,
2436            "external payload object size must cover payload length",
2437        ));
2438    }
2439    Ok(())
2440}
2441
2442fn restore_producer_states(
2443    stream_id: &BucketStreamId,
2444    snapshots: Vec<ProducerSnapshot>,
2445) -> Result<HashMap<String, ProducerState>, StreamSnapshotError> {
2446    let mut states = HashMap::with_capacity(snapshots.len());
2447    for snapshot in snapshots {
2448        if states
2449            .insert(
2450                snapshot.producer_id.clone(),
2451                ProducerState {
2452                    producer_epoch: snapshot.producer_epoch,
2453                    producer_seq: snapshot.producer_seq,
2454                    last_start_offset: snapshot.last_start_offset,
2455                    last_next_offset: snapshot.last_next_offset,
2456                    last_closed: snapshot.last_closed,
2457                    last_items: snapshot.last_items,
2458                },
2459            )
2460            .is_some()
2461        {
2462            return Err(StreamSnapshotError::DuplicateProducer {
2463                stream_id: stream_id.clone(),
2464                producer_id: snapshot.producer_id,
2465            });
2466        }
2467    }
2468    Ok(states)
2469}
2470
2471fn valid_cold_chunk_ref(chunk: &ColdChunkRef) -> bool {
2472    chunk.end_offset > chunk.start_offset
2473        && !chunk.s3_path.trim().is_empty()
2474        && chunk.object_size >= chunk.end_offset - chunk.start_offset
2475}
2476
2477fn valid_object_payload_ref(object: &ObjectPayloadRef) -> bool {
2478    object.end_offset > object.start_offset
2479        && !object.s3_path.trim().is_empty()
2480        && object.object_size >= object.end_offset - object.start_offset
2481}
2482
2483fn hot_segments_match_payload(segments: &[HotPayloadSegment], payload_len: usize) -> bool {
2484    let mut expected_payload_start = 0;
2485    for segment in segments {
2486        if segment.end_offset <= segment.start_offset
2487            || segment.payload_start != expected_payload_start
2488            || segment.payload_end <= segment.payload_start
2489            || segment.payload_end > payload_len
2490        {
2491            return false;
2492        }
2493        let Ok(logical_len) = usize::try_from(segment.end_offset - segment.start_offset) else {
2494            return false;
2495        };
2496        if logical_len != segment.payload_end - segment.payload_start {
2497            return false;
2498        }
2499        expected_payload_start = segment.payload_end;
2500    }
2501    expected_payload_start == payload_len
2502}
2503
2504fn payload_sources_cover_retained_suffix(
2505    cold_chunks: &[ColdChunkRef],
2506    external_segments: &[ObjectPayloadRef],
2507    hot_segments: &[HotPayloadSegment],
2508    retained_offset: u64,
2509    tail_offset: u64,
2510) -> bool {
2511    if tail_offset < retained_offset {
2512        return false;
2513    }
2514    let mut ranges =
2515        Vec::with_capacity(cold_chunks.len() + external_segments.len() + hot_segments.len());
2516    for chunk in cold_chunks {
2517        if !valid_cold_chunk_ref(chunk) {
2518            return false;
2519        }
2520        ranges.push((chunk.start_offset, chunk.end_offset));
2521    }
2522    for object in external_segments {
2523        if !valid_object_payload_ref(object) {
2524            return false;
2525        }
2526        ranges.push((object.start_offset, object.end_offset));
2527    }
2528    for segment in hot_segments {
2529        if segment.end_offset <= segment.start_offset {
2530            return false;
2531        }
2532        ranges.push((segment.start_offset, segment.end_offset));
2533    }
2534    ranges.sort_unstable();
2535
2536    let mut expected_start = retained_offset;
2537    for (start_offset, end_offset) in ranges {
2538        if end_offset <= expected_start {
2539            continue;
2540        }
2541        if start_offset > expected_start {
2542            return false;
2543        }
2544        expected_start = end_offset;
2545        if expected_start >= tail_offset {
2546            return true;
2547        }
2548    }
2549    expected_start == tail_offset
2550}
2551
2552fn segments_cover_range(
2553    segments: &[(u64, StreamReadSegment)],
2554    offset: u64,
2555    next_offset: u64,
2556) -> bool {
2557    if next_offset < offset {
2558        return false;
2559    }
2560    let mut expected_start = offset;
2561    for (segment_start, segment) in segments {
2562        let Some(segment_end) = read_segment_end(*segment_start, segment) else {
2563            return false;
2564        };
2565        if segment_end <= expected_start {
2566            continue;
2567        }
2568        if *segment_start > expected_start {
2569            return false;
2570        }
2571        expected_start = segment_end;
2572        if expected_start >= next_offset {
2573            return true;
2574        }
2575    }
2576    expected_start == next_offset
2577}
2578
2579fn read_segment_end(segment_start: u64, segment: &StreamReadSegment) -> Option<u64> {
2580    match segment {
2581        StreamReadSegment::Object(object) => {
2582            if object.len == 0
2583                || object.read_start_offset != segment_start
2584                || object.read_start_offset < object.object.start_offset
2585            {
2586                return None;
2587            }
2588            let len = u64::try_from(object.len).ok()?;
2589            let segment_end = object.read_start_offset.checked_add(len)?;
2590            if segment_end > object.object.end_offset {
2591                return None;
2592            }
2593            Some(segment_end)
2594        }
2595        StreamReadSegment::Hot(payload) => {
2596            if payload.is_empty() {
2597                return None;
2598            }
2599            let len = u64::try_from(payload.len()).ok()?;
2600            segment_start.checked_add(len)
2601        }
2602    }
2603}
2604
2605fn message_records_cover_retained_suffix(
2606    records: &[StreamMessageRecord],
2607    retained_offset: u64,
2608    tail_offset: u64,
2609) -> bool {
2610    let mut expected_start = retained_offset;
2611    for record in records {
2612        if record.start_offset != expected_start || record.end_offset <= record.start_offset {
2613            return false;
2614        }
2615        expected_start = record.end_offset;
2616    }
2617    expected_start == tail_offset
2618}
2619
2620fn compare_stream_ids(left: &BucketStreamId, right: &BucketStreamId) -> std::cmp::Ordering {
2621    left.bucket_id
2622        .cmp(&right.bucket_id)
2623        .then_with(|| left.stream_id.cmp(&right.stream_id))
2624}
2625
2626#[cfg(test)]
2627mod tests;