1use std::collections::{HashMap, HashSet};
2
3use ursula_shard::BucketStreamId;
4
5use crate::command::StreamCommand;
6use crate::model::{
7 AppendExternalInput, AppendStreamInput, ColdChunkRef, ColdFlushCandidate, ExternalPayloadRef,
8 HotPayloadSegment, ObjectPayloadRef, ProducerAppendRecord, ProducerRequest, ProducerSnapshot,
9 ProducerState, StreamBatchAppend, StreamBatchAppendItem, StreamBootstrapPlan,
10 StreamMessageRecord, StreamMetadata, StreamRead, StreamReadObjectSegment, StreamReadPlan,
11 StreamReadSegment, StreamStatus, StreamVisibleSnapshot,
12};
13use crate::response::{StreamErrorCode, StreamResponse};
14use crate::snapshot::{StreamSnapshot, StreamSnapshotEntry, StreamSnapshotError};
15use crate::validate::{validate_bucket_id, validate_stream_id};
16
17#[derive(Debug, Clone, Default)]
18pub struct StreamStateMachine {
19 buckets: HashSet<String>,
20 streams: HashMap<BucketStreamId, StreamMetadata>,
21 payloads: HashMap<BucketStreamId, Vec<u8>>,
22 hot_segments: HashMap<BucketStreamId, Vec<HotPayloadSegment>>,
23 hot_start_offsets: HashMap<BucketStreamId, u64>,
24 cold_chunks: HashMap<BucketStreamId, Vec<ColdChunkRef>>,
25 external_segments: HashMap<BucketStreamId, Vec<ObjectPayloadRef>>,
26 message_records: HashMap<BucketStreamId, Vec<StreamMessageRecord>>,
27 visible_snapshots: HashMap<BucketStreamId, StreamVisibleSnapshot>,
28 producers: HashMap<BucketStreamId, HashMap<String, ProducerState>>,
29}
30
31impl StreamStateMachine {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn apply(&mut self, command: StreamCommand) -> StreamResponse {
37 match command {
38 StreamCommand::CreateBucket { bucket_id } => self.create_bucket(bucket_id),
39 StreamCommand::DeleteBucket { bucket_id } => self.delete_bucket(&bucket_id),
40 StreamCommand::CreateStream {
41 stream_id,
42 content_type,
43 initial_payload,
44 close_after,
45 stream_seq,
46 producer,
47 stream_ttl_seconds,
48 stream_expires_at_ms,
49 forked_from,
50 fork_offset,
51 now_ms,
52 } => self.create_stream(CreateStreamInput {
53 stream_id,
54 content_type,
55 initial_payload,
56 close_after,
57 stream_seq,
58 producer,
59 stream_ttl_seconds,
60 stream_expires_at_ms,
61 forked_from,
62 fork_offset,
63 now_ms,
64 }),
65 StreamCommand::CreateExternal {
66 stream_id,
67 content_type,
68 initial_payload,
69 close_after,
70 stream_seq,
71 producer,
72 stream_ttl_seconds,
73 stream_expires_at_ms,
74 forked_from,
75 fork_offset,
76 now_ms,
77 } => self.create_external_stream(CreateExternalStreamInput {
78 stream_id,
79 content_type,
80 initial_payload,
81 close_after,
82 stream_seq,
83 producer,
84 stream_ttl_seconds,
85 stream_expires_at_ms,
86 forked_from,
87 fork_offset,
88 now_ms,
89 }),
90 StreamCommand::Append {
91 stream_id,
92 content_type,
93 payload,
94 close_after,
95 stream_seq,
96 producer,
97 now_ms,
98 } => self.append_borrowed(AppendStreamInput {
99 stream_id,
100 content_type: content_type.as_deref(),
101 payload: &payload,
102 close_after,
103 stream_seq,
104 producer,
105 now_ms,
106 }),
107 StreamCommand::AppendExternal {
108 stream_id,
109 content_type,
110 payload,
111 close_after,
112 stream_seq,
113 producer,
114 now_ms,
115 } => self.append_external(AppendExternalInput {
116 stream_id,
117 content_type: content_type.as_deref(),
118 payload,
119 close_after,
120 stream_seq,
121 producer,
122 now_ms,
123 }),
124 StreamCommand::AppendBatch {
125 stream_id,
126 content_type,
127 payloads,
128 producer,
129 now_ms,
130 } => match self.append_batch_borrowed(
131 stream_id,
132 content_type.as_deref(),
133 &payloads.iter().map(Vec::as_slice).collect::<Vec<_>>(),
134 producer,
135 now_ms,
136 ) {
137 Ok(batch) => batch
138 .items
139 .last()
140 .map(|item| StreamResponse::Appended {
141 offset: item.offset,
142 next_offset: item.next_offset,
143 closed: item.closed,
144 deduplicated: item.deduplicated,
145 producer: None,
146 })
147 .unwrap_or_else(|| {
148 StreamResponse::error(
149 StreamErrorCode::EmptyAppend,
150 "append batch must contain at least one payload",
151 )
152 }),
153 Err(response) => response,
154 },
155 StreamCommand::PublishSnapshot {
156 stream_id,
157 snapshot_offset,
158 content_type,
159 payload,
160 now_ms,
161 } => self.publish_snapshot(stream_id, snapshot_offset, content_type, payload, now_ms),
162 StreamCommand::TouchStreamAccess {
163 stream_id,
164 now_ms,
165 renew_ttl,
166 } => self.touch_stream_access(&stream_id, now_ms, renew_ttl),
167 StreamCommand::AddForkRef { stream_id, now_ms } => {
168 self.add_fork_ref(&stream_id, now_ms)
169 }
170 StreamCommand::ReleaseForkRef { stream_id } => self.release_fork_ref(&stream_id),
171 StreamCommand::FlushCold { stream_id, chunk } => self.flush_cold(stream_id, chunk),
172 StreamCommand::Close {
173 stream_id,
174 stream_seq,
175 producer,
176 now_ms,
177 } => self.close(stream_id, stream_seq, producer, now_ms),
178 StreamCommand::DeleteStream { stream_id } => self.delete_stream(&stream_id),
179 }
180 }
181
182 pub fn head(&self, stream_id: &BucketStreamId) -> Option<&StreamMetadata> {
183 self.streams.get(stream_id)
184 }
185
186 pub fn head_at(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> Option<&StreamMetadata> {
187 self.expire_stream_if_due(stream_id, now_ms);
188 self.streams.get(stream_id)
189 }
190
191 pub fn access_requires_write(
192 &self,
193 stream_id: &BucketStreamId,
194 now_ms: u64,
195 renew_ttl: bool,
196 ) -> Result<bool, StreamResponse> {
197 self.validate_stream_scope(stream_id)?;
198 let Some(stream) = self.streams.get(stream_id) else {
199 return Err(StreamResponse::error(
200 StreamErrorCode::StreamNotFound,
201 format!("stream '{stream_id}' does not exist"),
202 ));
203 };
204 if is_soft_deleted(stream) {
205 return Err(StreamResponse::error(
206 StreamErrorCode::StreamGone,
207 format!("stream '{stream_id}' is gone"),
208 ));
209 }
210 if stream_is_expired(stream, now_ms) {
211 return Ok(true);
212 }
213 Ok(renew_ttl
214 && stream.stream_ttl_seconds.is_some()
215 && stream.last_ttl_touch_at_ms != now_ms)
216 }
217
218 pub fn hot_start_offset(&self, stream_id: &BucketStreamId) -> u64 {
219 self.hot_start_offsets.get(stream_id).copied().unwrap_or(0)
220 }
221
222 pub fn cold_chunks(&self, stream_id: &BucketStreamId) -> &[ColdChunkRef] {
223 self.cold_chunks
224 .get(stream_id)
225 .map(Vec::as_slice)
226 .unwrap_or(&[])
227 }
228
229 pub fn external_segments(&self, stream_id: &BucketStreamId) -> &[ObjectPayloadRef] {
230 self.external_segments
231 .get(stream_id)
232 .map(Vec::as_slice)
233 .unwrap_or(&[])
234 }
235
236 pub fn hot_segments(&self, stream_id: &BucketStreamId) -> &[HotPayloadSegment] {
237 self.hot_segments
238 .get(stream_id)
239 .map(Vec::as_slice)
240 .unwrap_or(&[])
241 }
242
243 pub fn hot_payload_len(&self, stream_id: &BucketStreamId) -> Result<u64, StreamResponse> {
244 let Some(stream) = self.streams.get(stream_id) else {
245 return Err(StreamResponse::error(
246 StreamErrorCode::StreamNotFound,
247 format!("stream '{stream_id}' does not exist"),
248 ));
249 };
250 if is_soft_deleted(stream) {
251 return Err(StreamResponse::error(
252 StreamErrorCode::StreamGone,
253 format!("stream '{stream_id}' is gone"),
254 ));
255 }
256 let payload = self
257 .payloads
258 .get(stream_id)
259 .expect("payload vector exists for stream metadata");
260 Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
261 }
262
263 pub fn total_hot_payload_bytes(&self) -> u64 {
264 self.payloads
265 .values()
266 .map(|payload| u64::try_from(payload.len()).expect("payload len fits u64"))
267 .sum()
268 }
269
270 pub fn plan_cold_flush(
271 &self,
272 stream_id: &BucketStreamId,
273 min_hot_bytes: usize,
274 max_flush_bytes: usize,
275 ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
276 if max_flush_bytes == 0 {
277 return Ok(None);
278 }
279 let Some(stream) = self.streams.get(stream_id) else {
280 return Err(StreamResponse::error(
281 StreamErrorCode::StreamNotFound,
282 format!("stream '{stream_id}' does not exist"),
283 ));
284 };
285 if is_soft_deleted(stream) {
286 return Err(StreamResponse::error(
287 StreamErrorCode::StreamGone,
288 format!("stream '{stream_id}' is gone"),
289 ));
290 }
291 let Some(first_segment) = self.hot_segments(stream_id).first() else {
292 return Ok(None);
293 };
294 let mut payload_end = first_segment.payload_start;
295 let mut end_offset = first_segment.start_offset;
296 let mut flush_len = 0usize;
297 for segment in self.hot_segments(stream_id) {
298 if segment.start_offset != end_offset || segment.payload_start != payload_end {
299 break;
300 }
301 let remaining = max_flush_bytes.saturating_sub(flush_len);
302 if remaining == 0 {
303 break;
304 }
305 let segment_len = segment.payload_end - segment.payload_start;
306 let take = segment_len.min(remaining);
307 flush_len += take;
308 payload_end += take;
309 end_offset = end_offset.saturating_add(u64::try_from(take).expect("take fits u64"));
310 if take < segment_len {
311 break;
312 }
313 }
314 if flush_len < min_hot_bytes {
315 return Ok(None);
316 }
317 let payload = self
318 .payloads
319 .get(stream_id)
320 .expect("payload vector exists for stream metadata");
321 let start_offset = first_segment.start_offset;
322 Ok(Some(ColdFlushCandidate {
323 stream_id: stream_id.clone(),
324 start_offset,
325 end_offset,
326 payload: payload[first_segment.payload_start..payload_end].to_vec(),
327 }))
328 }
329
330 pub fn plan_next_cold_flush(
331 &self,
332 min_hot_bytes: usize,
333 max_flush_bytes: usize,
334 ) -> Result<Option<ColdFlushCandidate>, StreamResponse> {
335 if max_flush_bytes == 0 {
336 return Ok(None);
337 }
338 let mut stream_ids = self.streams.keys().cloned().collect::<Vec<_>>();
339 stream_ids.sort_by(compare_stream_ids);
340 for stream_id in stream_ids {
341 match self.plan_cold_flush(&stream_id, min_hot_bytes, max_flush_bytes) {
342 Ok(Some(candidate)) => return Ok(Some(candidate)),
343 Ok(None) => {}
344 Err(StreamResponse::Error {
345 code: StreamErrorCode::StreamGone | StreamErrorCode::StreamNotFound,
346 ..
347 }) => {}
348 Err(err) => return Err(err),
349 }
350 }
351 Ok(None)
352 }
353
354 pub fn plan_next_cold_flush_batch(
355 &self,
356 min_hot_bytes: usize,
357 max_flush_bytes: usize,
358 max_candidates: usize,
359 ) -> Result<Vec<ColdFlushCandidate>, StreamResponse> {
360 if max_candidates == 0 || max_flush_bytes == 0 {
361 return Ok(Vec::new());
362 }
363 let mut preview = self.clone();
364 let mut candidates = Vec::with_capacity(max_candidates);
365 while candidates.len() < max_candidates {
366 let Some(candidate) = preview.plan_next_cold_flush(min_hot_bytes, max_flush_bytes)?
367 else {
368 break;
369 };
370 let chunk = ColdChunkRef {
371 start_offset: candidate.start_offset,
372 end_offset: candidate.end_offset,
373 s3_path: "planned-cold-flush-batch".to_owned(),
374 object_size: u64::try_from(candidate.payload.len()).expect("payload len fits u64"),
375 };
376 match preview.flush_cold(candidate.stream_id.clone(), chunk) {
377 StreamResponse::ColdFlushed { .. } => candidates.push(candidate),
378 StreamResponse::Error { .. } => break,
379 other => {
380 return Err(StreamResponse::error(
381 StreamErrorCode::InvalidColdFlush,
382 format!("unexpected cold flush planning response: {other:?}"),
383 ));
384 }
385 }
386 }
387 Ok(candidates)
388 }
389
390 pub fn bucket_exists(&self, bucket_id: &str) -> bool {
391 self.buckets.contains(bucket_id)
392 }
393
394 pub fn snapshot(&self) -> StreamSnapshot {
395 let mut buckets = self.buckets.iter().cloned().collect::<Vec<_>>();
396 buckets.sort();
397
398 let mut streams = self
399 .streams
400 .values()
401 .cloned()
402 .map(|metadata| {
403 let stream_id = metadata.stream_id.clone();
404 let payload = self
405 .payloads
406 .get(&stream_id)
407 .expect("payload vector exists for stream metadata")
408 .clone();
409 let producer_states = self.producer_snapshot(&stream_id);
410 StreamSnapshotEntry {
411 metadata,
412 hot_start_offset: self.hot_start_offset(&stream_id),
413 payload,
414 hot_segments: self
415 .hot_segments
416 .get(&stream_id)
417 .cloned()
418 .unwrap_or_default(),
419 cold_chunks: self
420 .cold_chunks
421 .get(&stream_id)
422 .cloned()
423 .unwrap_or_default(),
424 external_segments: self
425 .external_segments
426 .get(&stream_id)
427 .cloned()
428 .unwrap_or_default(),
429 message_records: self
430 .message_records
431 .get(&stream_id)
432 .cloned()
433 .unwrap_or_default(),
434 visible_snapshot: self.visible_snapshots.get(&stream_id).cloned(),
435 producer_states,
436 }
437 })
438 .collect::<Vec<_>>();
439 streams.sort_by(|left, right| {
440 compare_stream_ids(&left.metadata.stream_id, &right.metadata.stream_id)
441 });
442
443 StreamSnapshot { buckets, streams }
444 }
445
446 pub fn restore(snapshot: StreamSnapshot) -> Result<Self, StreamSnapshotError> {
447 let mut machine = Self::default();
448 for bucket_id in snapshot.buckets {
449 if !machine.buckets.insert(bucket_id.clone()) {
450 return Err(StreamSnapshotError::DuplicateBucket(bucket_id));
451 }
452 }
453
454 for entry in snapshot.streams {
455 let stream_id = entry.metadata.stream_id.clone();
456 if !machine.buckets.contains(&stream_id.bucket_id) {
457 return Err(StreamSnapshotError::MissingBucket(stream_id));
458 }
459 if let Some(snapshot) = entry.visible_snapshot.as_ref()
460 && snapshot.offset > entry.metadata.tail_offset
461 {
462 return Err(StreamSnapshotError::SnapshotOffsetOutOfRange {
463 stream_id,
464 snapshot_offset: snapshot.offset,
465 tail_offset: entry.metadata.tail_offset,
466 });
467 }
468 let retained_offset = entry
469 .visible_snapshot
470 .as_ref()
471 .map(|snapshot| snapshot.offset)
472 .unwrap_or(0);
473 let hot_segments = if entry.hot_segments.is_empty() && !entry.payload.is_empty() {
474 vec![HotPayloadSegment {
475 start_offset: entry.hot_start_offset,
476 end_offset: entry.metadata.tail_offset,
477 payload_start: 0,
478 payload_end: entry.payload.len(),
479 }]
480 } else {
481 entry.hot_segments
482 };
483 if !hot_segments_match_payload(&hot_segments, entry.payload.len())
484 || !payload_sources_cover_retained_suffix(
485 &entry.cold_chunks,
486 &entry.external_segments,
487 &hot_segments,
488 retained_offset,
489 entry.metadata.tail_offset,
490 )
491 {
492 return Err(StreamSnapshotError::PayloadLengthMismatch {
493 stream_id,
494 tail_offset: entry.metadata.tail_offset,
495 payload_len: entry.payload.len(),
496 });
497 }
498 if !message_records_cover_retained_suffix(
499 &entry.message_records,
500 retained_offset,
501 entry.metadata.tail_offset,
502 ) {
503 return Err(StreamSnapshotError::MessageBoundaryMismatch { stream_id });
504 }
505 if machine
506 .streams
507 .insert(entry.metadata.stream_id.clone(), entry.metadata)
508 .is_some()
509 {
510 return Err(StreamSnapshotError::DuplicateStream(stream_id));
511 }
512 let producer_states = restore_producer_states(&stream_id, entry.producer_states)?;
513 if !hot_segments.is_empty() {
514 machine.hot_segments.insert(stream_id.clone(), hot_segments);
515 }
516 if !entry.cold_chunks.is_empty() {
517 machine
518 .cold_chunks
519 .insert(stream_id.clone(), entry.cold_chunks);
520 }
521 if !entry.external_segments.is_empty() {
522 machine
523 .external_segments
524 .insert(stream_id.clone(), entry.external_segments);
525 }
526 if !entry.message_records.is_empty() {
527 machine
528 .message_records
529 .insert(stream_id.clone(), entry.message_records);
530 }
531 if let Some(snapshot) = entry.visible_snapshot {
532 machine
533 .visible_snapshots
534 .insert(stream_id.clone(), snapshot);
535 }
536 machine.payloads.insert(stream_id.clone(), entry.payload);
537 machine.producers.insert(stream_id.clone(), producer_states);
538 machine.refresh_hot_start_offset(&stream_id);
539 }
540
541 Ok(machine)
542 }
543
544 pub fn read(
545 &self,
546 stream_id: &BucketStreamId,
547 offset: u64,
548 max_len: usize,
549 ) -> Result<StreamRead, StreamResponse> {
550 let plan = self.read_plan(stream_id, offset, max_len)?;
551 if plan
552 .segments
553 .iter()
554 .any(|segment| matches!(segment, StreamReadSegment::Object(_)))
555 {
556 return Err(StreamResponse::error_with_next_offset(
557 StreamErrorCode::InvalidColdFlush,
558 format!("stream '{stream_id}' read requires object payload store"),
559 plan.next_offset,
560 ));
561 }
562 let payload = plan
563 .segments
564 .iter()
565 .flat_map(|segment| match segment {
566 StreamReadSegment::Hot(payload) => payload.as_slice(),
567 StreamReadSegment::Object(_) => unreachable!("object segments checked above"),
568 })
569 .copied()
570 .collect();
571 Ok(StreamRead {
572 offset: plan.offset,
573 next_offset: plan.next_offset,
574 content_type: plan.content_type,
575 payload,
576 up_to_date: plan.up_to_date,
577 closed: plan.closed,
578 })
579 }
580
581 pub fn read_plan(
582 &self,
583 stream_id: &BucketStreamId,
584 offset: u64,
585 max_len: usize,
586 ) -> Result<StreamReadPlan, StreamResponse> {
587 self.read_plan_at(stream_id, offset, max_len, 0)
588 }
589
590 pub fn read_plan_at(
591 &self,
592 stream_id: &BucketStreamId,
593 offset: u64,
594 max_len: usize,
595 now_ms: u64,
596 ) -> Result<StreamReadPlan, StreamResponse> {
597 let Some(stream) = self.streams.get(stream_id) else {
598 return Err(StreamResponse::error(
599 StreamErrorCode::StreamNotFound,
600 format!("stream '{stream_id}' does not exist"),
601 ));
602 };
603 if is_soft_deleted(stream) {
604 return Err(StreamResponse::error(
605 StreamErrorCode::StreamGone,
606 format!("stream '{stream_id}' is gone"),
607 ));
608 }
609 if stream_is_expired(stream, now_ms) {
610 return Err(StreamResponse::error(
611 StreamErrorCode::StreamNotFound,
612 format!("stream '{stream_id}' does not exist"),
613 ));
614 }
615 if offset > stream.tail_offset {
616 return Err(StreamResponse::error_with_next_offset(
617 StreamErrorCode::OffsetOutOfRange,
618 format!(
619 "offset {offset} is beyond stream '{}' tail {}",
620 stream_id, stream.tail_offset
621 ),
622 stream.tail_offset,
623 ));
624 }
625 let retained_offset = self.earliest_retained_offset(stream_id);
626 if offset < retained_offset {
627 return Err(StreamResponse::error_with_next_offset(
628 StreamErrorCode::StreamGone,
629 format!(
630 "offset {offset} is older than stream '{}' retained offset {retained_offset}",
631 stream_id
632 ),
633 retained_offset,
634 ));
635 }
636
637 let max_len_u64 = u64::try_from(max_len).unwrap_or(u64::MAX);
638 let next_offset = stream.tail_offset.min(offset.saturating_add(max_len_u64));
639 let payload = self
640 .payloads
641 .get(stream_id)
642 .expect("payload vector exists for stream metadata");
643 let mut segments = Vec::<(u64, StreamReadSegment)>::new();
644 for chunk in self.cold_chunks(stream_id) {
645 let start = offset.max(chunk.start_offset);
646 let end = next_offset.min(chunk.end_offset);
647 if start < end {
648 segments.push((
649 start,
650 StreamReadSegment::Object(StreamReadObjectSegment {
651 object: ObjectPayloadRef::from(chunk),
652 read_start_offset: start,
653 len: usize::try_from(end - start).expect("object read len fits usize"),
654 }),
655 ));
656 }
657 }
658 for object in self.external_segments(stream_id) {
659 let start = offset.max(object.start_offset);
660 let end = next_offset.min(object.end_offset);
661 if start < end {
662 segments.push((
663 start,
664 StreamReadSegment::Object(StreamReadObjectSegment {
665 object: object.clone(),
666 read_start_offset: start,
667 len: usize::try_from(end - start).expect("object read len fits usize"),
668 }),
669 ));
670 }
671 }
672 for segment in self.hot_segments(stream_id) {
673 let start = offset.max(segment.start_offset);
674 let end = next_offset.min(segment.end_offset);
675 if start < end {
676 let payload_start = segment.payload_start
677 + usize::try_from(start - segment.start_offset)
678 .expect("hot segment start fits usize");
679 let payload_end = segment.payload_start
680 + usize::try_from(end - segment.start_offset)
681 .expect("hot segment end fits usize");
682 segments.push((
683 start,
684 StreamReadSegment::Hot(payload[payload_start..payload_end].to_vec()),
685 ));
686 }
687 }
688 segments.sort_by_key(|(start, _)| *start);
689 if !segments_cover_range(&segments, offset, next_offset) {
690 return Err(StreamResponse::error_with_next_offset(
691 StreamErrorCode::InvalidColdFlush,
692 format!("stream '{stream_id}' has missing payload segment metadata"),
693 next_offset,
694 ));
695 }
696 Ok(StreamReadPlan {
697 offset,
698 next_offset,
699 content_type: stream.content_type.clone(),
700 segments: segments.into_iter().map(|(_, segment)| segment).collect(),
701 up_to_date: next_offset == stream.tail_offset,
702 closed: stream.status == StreamStatus::Closed,
703 })
704 }
705
706 pub fn latest_snapshot(
707 &self,
708 stream_id: &BucketStreamId,
709 ) -> Result<Option<StreamVisibleSnapshot>, StreamResponse> {
710 let Some(stream) = self.streams.get(stream_id) else {
711 return Err(StreamResponse::error(
712 StreamErrorCode::StreamNotFound,
713 format!("stream '{stream_id}' does not exist"),
714 ));
715 };
716 if is_soft_deleted(stream) {
717 return Err(StreamResponse::error(
718 StreamErrorCode::StreamGone,
719 format!("stream '{stream_id}' is gone"),
720 ));
721 }
722 Ok(self.visible_snapshots.get(stream_id).cloned())
723 }
724
725 pub fn read_snapshot(
726 &self,
727 stream_id: &BucketStreamId,
728 snapshot_offset: u64,
729 ) -> Result<StreamVisibleSnapshot, StreamResponse> {
730 let snapshot = self.latest_snapshot(stream_id)?;
731 match snapshot {
732 Some(snapshot) if snapshot.offset == snapshot_offset => Ok(snapshot),
733 _ => Err(StreamResponse::error(
734 StreamErrorCode::SnapshotNotFound,
735 format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
736 )),
737 }
738 }
739
740 pub fn delete_snapshot(
741 &self,
742 stream_id: &BucketStreamId,
743 snapshot_offset: u64,
744 ) -> StreamResponse {
745 match self.latest_snapshot(stream_id) {
746 Ok(Some(snapshot)) if snapshot.offset == snapshot_offset => StreamResponse::error(
747 StreamErrorCode::SnapshotConflict,
748 format!(
749 "snapshot {snapshot_offset} for stream '{stream_id}' is the latest visible snapshot"
750 ),
751 ),
752 Ok(_) => StreamResponse::error(
753 StreamErrorCode::SnapshotNotFound,
754 format!("snapshot {snapshot_offset} for stream '{stream_id}' does not exist"),
755 ),
756 Err(err) => err,
757 }
758 }
759
760 pub fn bootstrap_plan(
761 &self,
762 stream_id: &BucketStreamId,
763 ) -> Result<StreamBootstrapPlan, StreamResponse> {
764 let Some(stream) = self.streams.get(stream_id) else {
765 return Err(StreamResponse::error(
766 StreamErrorCode::StreamNotFound,
767 format!("stream '{stream_id}' does not exist"),
768 ));
769 };
770 if is_soft_deleted(stream) {
771 return Err(StreamResponse::error(
772 StreamErrorCode::StreamGone,
773 format!("stream '{stream_id}' is gone"),
774 ));
775 }
776 let snapshot = self.visible_snapshots.get(stream_id).cloned();
777 let retained_offset = snapshot
778 .as_ref()
779 .map(|snapshot| snapshot.offset)
780 .unwrap_or(0);
781 let updates = self
782 .message_records
783 .get(stream_id)
784 .map(|records| {
785 records
786 .iter()
787 .filter(|record| record.start_offset >= retained_offset)
788 .cloned()
789 .collect::<Vec<_>>()
790 })
791 .unwrap_or_default();
792 Ok(StreamBootstrapPlan {
793 snapshot,
794 updates,
795 next_offset: stream.tail_offset,
796 content_type: stream.content_type.clone(),
797 up_to_date: true,
798 closed: stream.status == StreamStatus::Closed,
799 })
800 }
801
802 fn publish_snapshot(
803 &mut self,
804 stream_id: BucketStreamId,
805 snapshot_offset: u64,
806 content_type: String,
807 payload: Vec<u8>,
808 now_ms: u64,
809 ) -> StreamResponse {
810 if let Err(response) = self.validate_stream_scope(&stream_id) {
811 return response;
812 }
813 if content_type.trim().is_empty() {
814 return StreamResponse::error(
815 StreamErrorCode::InvalidSnapshot,
816 "snapshot content type must not be empty",
817 );
818 }
819 let Some(stream) = self.streams.get(&stream_id) else {
820 return StreamResponse::error(
821 StreamErrorCode::StreamNotFound,
822 format!("stream '{stream_id}' does not exist"),
823 );
824 };
825 if is_soft_deleted(stream) {
826 return StreamResponse::error(
827 StreamErrorCode::StreamGone,
828 format!("stream '{stream_id}' is gone"),
829 );
830 }
831 if stream_is_expired(stream, now_ms) {
832 self.remove_stream_state(&stream_id);
833 return StreamResponse::error(
834 StreamErrorCode::StreamNotFound,
835 format!("stream '{stream_id}' does not exist"),
836 );
837 }
838 let tail_offset = stream.tail_offset;
839 let retained_offset = self.earliest_retained_offset(&stream_id);
840 if snapshot_offset < retained_offset {
841 return StreamResponse::error_with_next_offset(
842 StreamErrorCode::StreamGone,
843 format!(
844 "snapshot offset {snapshot_offset} is older than stream '{}' retained offset {retained_offset}",
845 stream_id
846 ),
847 retained_offset,
848 );
849 }
850 if snapshot_offset > tail_offset {
851 return StreamResponse::error_with_next_offset(
852 StreamErrorCode::SnapshotConflict,
853 format!(
854 "snapshot offset {snapshot_offset} is beyond stream '{}' tail {tail_offset}",
855 stream_id
856 ),
857 tail_offset,
858 );
859 }
860 if !self.snapshot_offset_aligned(&stream_id, snapshot_offset, retained_offset) {
861 return StreamResponse::error_with_next_offset(
862 StreamErrorCode::InvalidSnapshot,
863 format!(
864 "snapshot offset {snapshot_offset} is not aligned to a committed message boundary for stream '{stream_id}'"
865 ),
866 tail_offset,
867 );
868 }
869
870 self.visible_snapshots.insert(
871 stream_id.clone(),
872 StreamVisibleSnapshot {
873 offset: snapshot_offset,
874 content_type,
875 payload,
876 },
877 );
878 self.compact_retained_prefix(&stream_id, snapshot_offset);
879 StreamResponse::SnapshotPublished { snapshot_offset }
880 }
881
882 fn flush_cold(&mut self, stream_id: BucketStreamId, chunk: ColdChunkRef) -> StreamResponse {
883 if let Err(response) = self.validate_stream_scope(&stream_id) {
884 return response;
885 }
886 if chunk.s3_path.trim().is_empty() {
887 return StreamResponse::error(
888 StreamErrorCode::InvalidColdFlush,
889 "cold chunk S3 path must not be empty",
890 );
891 }
892 if chunk.object_size == 0 {
893 return StreamResponse::error(
894 StreamErrorCode::InvalidColdFlush,
895 "cold chunk object size must be greater than zero",
896 );
897 }
898 let Some(stream) = self.streams.get(&stream_id) else {
899 return StreamResponse::error(
900 StreamErrorCode::StreamNotFound,
901 format!("stream '{stream_id}' does not exist"),
902 );
903 };
904 if is_soft_deleted(stream) {
905 return StreamResponse::error(
906 StreamErrorCode::StreamGone,
907 format!("stream '{stream_id}' is gone"),
908 );
909 }
910 if chunk.end_offset <= chunk.start_offset {
911 return StreamResponse::error_with_next_offset(
912 StreamErrorCode::InvalidColdFlush,
913 "cold chunk must cover at least one byte",
914 stream.tail_offset,
915 );
916 }
917 if chunk.end_offset > stream.tail_offset {
918 return StreamResponse::error_with_next_offset(
919 StreamErrorCode::InvalidColdFlush,
920 format!(
921 "cold chunk end {} is beyond stream '{}' tail {}",
922 chunk.end_offset, stream_id, stream.tail_offset
923 ),
924 stream.tail_offset,
925 );
926 }
927 let segments = self.hot_segments(&stream_id);
928 let Some(segment_index) = segments
929 .iter()
930 .position(|segment| segment.start_offset == chunk.start_offset)
931 else {
932 return StreamResponse::error_with_next_offset(
933 StreamErrorCode::InvalidColdFlush,
934 format!(
935 "cold chunk for stream '{stream_id}' does not match the start of a hot payload segment"
936 ),
937 stream.tail_offset,
938 );
939 };
940
941 let drain_start = segments[segment_index].payload_start;
942 let mut covered_offset = chunk.start_offset;
943 let mut flush_len = 0usize;
944 for segment in segments.iter().skip(segment_index) {
945 if segment.start_offset != covered_offset {
946 break;
947 }
948 let segment_cover_end = chunk.end_offset.min(segment.end_offset);
949 let segment_flush_len = match usize::try_from(segment_cover_end - segment.start_offset)
950 {
951 Ok(segment_flush_len) => segment_flush_len,
952 Err(_) => {
953 return StreamResponse::error_with_next_offset(
954 StreamErrorCode::InvalidColdFlush,
955 "cold chunk length does not fit in memory",
956 stream.tail_offset,
957 );
958 }
959 };
960 let Some(expected_payload_start) = drain_start.checked_add(flush_len) else {
961 return StreamResponse::error_with_next_offset(
962 StreamErrorCode::InvalidColdFlush,
963 "cold chunk length does not fit in memory",
964 stream.tail_offset,
965 );
966 };
967 if segment.payload_start != expected_payload_start {
968 return StreamResponse::error_with_next_offset(
969 StreamErrorCode::InvalidColdFlush,
970 format!("stream '{stream_id}' has non-contiguous hot payload metadata"),
971 stream.tail_offset,
972 );
973 }
974 let segment_payload_len = segment.payload_end - segment.payload_start;
975 if segment_flush_len > segment_payload_len {
976 return StreamResponse::error_with_next_offset(
977 StreamErrorCode::InvalidColdFlush,
978 format!("cold chunk length exceeds stream '{stream_id}' hot segment metadata"),
979 stream.tail_offset,
980 );
981 }
982 let Some(new_flush_len) = flush_len.checked_add(segment_flush_len) else {
983 return StreamResponse::error_with_next_offset(
984 StreamErrorCode::InvalidColdFlush,
985 "cold chunk length does not fit in memory",
986 stream.tail_offset,
987 );
988 };
989 flush_len = new_flush_len;
990 covered_offset = segment_cover_end;
991 if covered_offset == chunk.end_offset {
992 break;
993 }
994 }
995 if covered_offset != chunk.end_offset {
996 return StreamResponse::error_with_next_offset(
997 StreamErrorCode::InvalidColdFlush,
998 format!(
999 "cold chunk for stream '{stream_id}' does not cover contiguous hot payload segments"
1000 ),
1001 stream.tail_offset,
1002 );
1003 }
1004 let Some(drain_end) = drain_start.checked_add(flush_len) else {
1005 return StreamResponse::error_with_next_offset(
1006 StreamErrorCode::InvalidColdFlush,
1007 "cold chunk length does not fit in memory",
1008 stream.tail_offset,
1009 );
1010 };
1011 let payload_len = self
1012 .payloads
1013 .get(&stream_id)
1014 .expect("payload vector exists for stream metadata")
1015 .len();
1016 if drain_end > payload_len {
1017 return StreamResponse::error_with_next_offset(
1018 StreamErrorCode::InvalidColdFlush,
1019 format!("cold chunk length exceeds stream '{stream_id}' hot payload length"),
1020 stream.tail_offset,
1021 );
1022 };
1023
1024 self.payloads
1025 .get_mut(&stream_id)
1026 .expect("payload vector exists for stream metadata")
1027 .drain(drain_start..drain_end);
1028 self.remove_drained_hot_range(
1029 &stream_id,
1030 segment_index,
1031 chunk.end_offset,
1032 drain_start,
1033 flush_len,
1034 );
1035 self.cold_chunks
1036 .entry(stream_id.clone())
1037 .or_default()
1038 .push(chunk.clone());
1039 self.refresh_hot_start_offset(&stream_id);
1040 StreamResponse::ColdFlushed {
1041 hot_start_offset: self.hot_start_offset(&stream_id),
1042 }
1043 }
1044
1045 fn create_bucket(&mut self, bucket_id: String) -> StreamResponse {
1046 if let Err(message) = validate_bucket_id(&bucket_id) {
1047 return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1048 }
1049 if !self.buckets.insert(bucket_id.clone()) {
1050 return StreamResponse::BucketAlreadyExists { bucket_id };
1051 }
1052 StreamResponse::BucketCreated { bucket_id }
1053 }
1054
1055 fn delete_bucket(&mut self, bucket_id: &str) -> StreamResponse {
1056 if let Err(message) = validate_bucket_id(bucket_id) {
1057 return StreamResponse::error(StreamErrorCode::InvalidBucketId, message);
1058 }
1059 if !self.buckets.contains(bucket_id) {
1060 return StreamResponse::error(
1061 StreamErrorCode::BucketNotFound,
1062 format!("bucket '{bucket_id}' does not exist"),
1063 );
1064 }
1065 if self
1066 .streams
1067 .keys()
1068 .any(|stream_id| stream_id.bucket_id == bucket_id)
1069 {
1070 return StreamResponse::error(
1071 StreamErrorCode::BucketNotEmpty,
1072 format!("bucket '{bucket_id}' is not empty"),
1073 );
1074 }
1075 self.buckets.remove(bucket_id);
1076 StreamResponse::BucketDeleted {
1077 bucket_id: bucket_id.to_owned(),
1078 }
1079 }
1080
1081 fn create_stream(&mut self, input: CreateStreamInput) -> StreamResponse {
1082 if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1083 return response;
1084 }
1085 if let Err(response) =
1086 validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1087 {
1088 return response;
1089 }
1090 if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1091 return response;
1092 }
1093 if let Some(producer) = input.producer.as_ref()
1094 && producer.producer_seq != 0
1095 {
1096 return StreamResponse::error(
1097 StreamErrorCode::ProducerSeqConflict,
1098 format!(
1099 "producer '{}' expected sequence 0, received {}",
1100 producer.producer_id, producer.producer_seq
1101 ),
1102 );
1103 }
1104 if self
1105 .streams
1106 .get(&input.stream_id)
1107 .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1108 {
1109 self.remove_stream_state(&input.stream_id);
1110 }
1111
1112 if let Some(existing) = self.streams.get(&input.stream_id) {
1113 if is_soft_deleted(existing) {
1114 return StreamResponse::error(
1115 StreamErrorCode::StreamAlreadyExistsConflict,
1116 format!(
1117 "stream '{}' is gone and cannot be recreated yet",
1118 input.stream_id
1119 ),
1120 );
1121 }
1122 if existing.content_type == input.content_type
1123 && existing.status == status_from_closed(input.close_after)
1124 && existing.stream_ttl_seconds == input.stream_ttl_seconds
1125 && existing.stream_expires_at_ms == input.stream_expires_at_ms
1126 && existing.forked_from == input.forked_from
1127 && existing.fork_offset == input.fork_offset
1128 {
1129 return StreamResponse::AlreadyExists {
1130 next_offset: existing.tail_offset,
1131 closed: existing.status == StreamStatus::Closed,
1132 content_type: existing.content_type.clone(),
1133 stream_ttl_seconds: existing.stream_ttl_seconds,
1134 stream_expires_at_ms: existing.stream_expires_at_ms,
1135 };
1136 }
1137 return StreamResponse::error(
1138 StreamErrorCode::StreamAlreadyExistsConflict,
1139 format!(
1140 "stream '{}' already exists with different metadata",
1141 input.stream_id
1142 ),
1143 );
1144 }
1145
1146 let initial_len = input.initial_len();
1147 let metadata = StreamMetadata {
1148 stream_id: input.stream_id.clone(),
1149 content_type: input.content_type,
1150 status: status_from_closed(input.close_after),
1151 tail_offset: initial_len,
1152 last_stream_seq: input.stream_seq,
1153 stream_ttl_seconds: input.stream_ttl_seconds,
1154 stream_expires_at_ms: input.stream_expires_at_ms,
1155 created_at_ms: input.now_ms,
1156 last_ttl_touch_at_ms: input.now_ms,
1157 forked_from: input.forked_from,
1158 fork_offset: input.fork_offset,
1159 fork_ref_count: 0,
1160 };
1161 self.streams.insert(input.stream_id.clone(), metadata);
1162 self.payloads
1163 .insert(input.stream_id.clone(), input.initial_payload);
1164 if initial_len > 0 {
1165 self.hot_segments.insert(
1166 input.stream_id.clone(),
1167 vec![HotPayloadSegment {
1168 start_offset: 0,
1169 end_offset: initial_len,
1170 payload_start: 0,
1171 payload_end: usize::try_from(initial_len).expect("payload len fits usize"),
1172 }],
1173 );
1174 self.message_records.insert(
1175 input.stream_id.clone(),
1176 vec![StreamMessageRecord {
1177 start_offset: 0,
1178 end_offset: initial_len,
1179 }],
1180 );
1181 }
1182 let mut producer_states = HashMap::new();
1183 if let Some(producer) = input.producer {
1184 let last_item = ProducerAppendRecord {
1185 start_offset: 0,
1186 next_offset: initial_len,
1187 closed: input.close_after,
1188 };
1189 producer_states.insert(
1190 producer.producer_id,
1191 ProducerState {
1192 producer_epoch: producer.producer_epoch,
1193 producer_seq: producer.producer_seq,
1194 last_start_offset: last_item.start_offset,
1195 last_next_offset: last_item.next_offset,
1196 last_closed: last_item.closed,
1197 last_items: vec![last_item],
1198 },
1199 );
1200 }
1201 self.producers
1202 .insert(input.stream_id.clone(), producer_states);
1203 StreamResponse::Created {
1204 stream_id: input.stream_id,
1205 next_offset: initial_len,
1206 closed: input.close_after,
1207 }
1208 }
1209
1210 fn create_external_stream(&mut self, input: CreateExternalStreamInput) -> StreamResponse {
1211 if let Err(response) = validate_external_payload_ref(&input.initial_payload) {
1212 return response;
1213 }
1214 if let Err(response) = self.validate_stream_scope(&input.stream_id) {
1215 return response;
1216 }
1217 if let Err(response) =
1218 validate_retention(input.stream_ttl_seconds, input.stream_expires_at_ms)
1219 {
1220 return response;
1221 }
1222 if let Err(response) = validate_producer_request(input.producer.as_ref()) {
1223 return response;
1224 }
1225 if let Some(producer) = input.producer.as_ref()
1226 && producer.producer_seq != 0
1227 {
1228 return StreamResponse::error(
1229 StreamErrorCode::ProducerSeqConflict,
1230 format!(
1231 "producer '{}' expected sequence 0, received {}",
1232 producer.producer_id, producer.producer_seq
1233 ),
1234 );
1235 }
1236 if self
1237 .streams
1238 .get(&input.stream_id)
1239 .is_some_and(|existing| stream_is_expired(existing, input.now_ms))
1240 {
1241 self.remove_stream_state(&input.stream_id);
1242 }
1243
1244 if let Some(existing) = self.streams.get(&input.stream_id) {
1245 if is_soft_deleted(existing) {
1246 return StreamResponse::error(
1247 StreamErrorCode::StreamAlreadyExistsConflict,
1248 format!(
1249 "stream '{}' is gone and cannot be recreated yet",
1250 input.stream_id
1251 ),
1252 );
1253 }
1254 if existing.content_type == input.content_type
1255 && existing.status == status_from_closed(input.close_after)
1256 && existing.stream_ttl_seconds == input.stream_ttl_seconds
1257 && existing.stream_expires_at_ms == input.stream_expires_at_ms
1258 && existing.forked_from == input.forked_from
1259 && existing.fork_offset == input.fork_offset
1260 {
1261 return StreamResponse::AlreadyExists {
1262 next_offset: existing.tail_offset,
1263 closed: existing.status == StreamStatus::Closed,
1264 content_type: existing.content_type.clone(),
1265 stream_ttl_seconds: existing.stream_ttl_seconds,
1266 stream_expires_at_ms: existing.stream_expires_at_ms,
1267 };
1268 }
1269 return StreamResponse::error(
1270 StreamErrorCode::StreamAlreadyExistsConflict,
1271 format!(
1272 "stream '{}' already exists with different metadata",
1273 input.stream_id
1274 ),
1275 );
1276 }
1277
1278 let initial_len = input.initial_payload.payload_len;
1279 let metadata = StreamMetadata {
1280 stream_id: input.stream_id.clone(),
1281 content_type: input.content_type,
1282 status: status_from_closed(input.close_after),
1283 tail_offset: initial_len,
1284 last_stream_seq: input.stream_seq,
1285 stream_ttl_seconds: input.stream_ttl_seconds,
1286 stream_expires_at_ms: input.stream_expires_at_ms,
1287 created_at_ms: input.now_ms,
1288 last_ttl_touch_at_ms: input.now_ms,
1289 forked_from: input.forked_from,
1290 fork_offset: input.fork_offset,
1291 fork_ref_count: 0,
1292 };
1293 self.streams.insert(input.stream_id.clone(), metadata);
1294 self.payloads.insert(input.stream_id.clone(), Vec::new());
1295 self.external_segments.insert(
1296 input.stream_id.clone(),
1297 vec![ObjectPayloadRef {
1298 start_offset: 0,
1299 end_offset: initial_len,
1300 s3_path: input.initial_payload.s3_path,
1301 object_size: input.initial_payload.object_size,
1302 }],
1303 );
1304 self.message_records.insert(
1305 input.stream_id.clone(),
1306 vec![StreamMessageRecord {
1307 start_offset: 0,
1308 end_offset: initial_len,
1309 }],
1310 );
1311 let mut producer_states = HashMap::new();
1312 if let Some(producer) = input.producer {
1313 let last_item = ProducerAppendRecord {
1314 start_offset: 0,
1315 next_offset: initial_len,
1316 closed: input.close_after,
1317 };
1318 producer_states.insert(
1319 producer.producer_id,
1320 ProducerState {
1321 producer_epoch: producer.producer_epoch,
1322 producer_seq: producer.producer_seq,
1323 last_start_offset: last_item.start_offset,
1324 last_next_offset: last_item.next_offset,
1325 last_closed: last_item.closed,
1326 last_items: vec![last_item],
1327 },
1328 );
1329 }
1330 self.producers
1331 .insert(input.stream_id.clone(), producer_states);
1332 StreamResponse::Created {
1333 stream_id: input.stream_id,
1334 next_offset: initial_len,
1335 closed: input.close_after,
1336 }
1337 }
1338
1339 pub fn append_borrowed(&mut self, input: AppendStreamInput<'_>) -> StreamResponse {
1340 let AppendStreamInput {
1341 stream_id,
1342 content_type,
1343 payload,
1344 close_after,
1345 stream_seq,
1346 producer,
1347 now_ms,
1348 } = input;
1349 if let Err(response) = self.validate_stream_scope(&stream_id) {
1350 return response;
1351 }
1352 if let Err(response) = validate_producer_request(producer.as_ref()) {
1353 return response;
1354 }
1355
1356 let Some(_) = self.streams.get(&stream_id) else {
1357 return StreamResponse::error(
1358 StreamErrorCode::StreamNotFound,
1359 format!("stream '{stream_id}' does not exist"),
1360 );
1361 };
1362 if self.expire_stream_if_due(&stream_id, now_ms) {
1363 return StreamResponse::error(
1364 StreamErrorCode::StreamNotFound,
1365 format!("stream '{stream_id}' does not exist"),
1366 );
1367 }
1368 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1369 return StreamResponse::error(
1370 StreamErrorCode::StreamGone,
1371 format!("stream '{stream_id}' is gone"),
1372 );
1373 }
1374 let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1375 Ok(decision) => decision,
1376 Err(response) => return response,
1377 };
1378 if let ProducerDecision::Duplicate {
1379 offset,
1380 next_offset,
1381 closed,
1382 producer,
1383 ..
1384 } = producer_decision
1385 {
1386 if payload.is_empty() {
1387 return StreamResponse::Closed {
1388 next_offset,
1389 deduplicated: true,
1390 producer: Some(producer),
1391 };
1392 }
1393 return StreamResponse::Appended {
1394 offset,
1395 next_offset,
1396 closed,
1397 deduplicated: true,
1398 producer: Some(producer),
1399 };
1400 }
1401
1402 let Some(stream) = self.streams.get_mut(&stream_id) else {
1403 unreachable!("stream existence checked before producer evaluation");
1404 };
1405
1406 if stream.status == StreamStatus::Closed {
1407 if close_after && payload.is_empty() {
1408 return StreamResponse::Closed {
1409 next_offset: stream.tail_offset,
1410 deduplicated: false,
1411 producer: None,
1412 };
1413 }
1414 return StreamResponse::error_with_next_offset(
1415 StreamErrorCode::StreamClosed,
1416 format!("stream '{stream_id}' is closed"),
1417 stream.tail_offset,
1418 );
1419 }
1420
1421 if payload.is_empty() && !close_after {
1422 return StreamResponse::error(
1423 StreamErrorCode::EmptyAppend,
1424 "append payload must be non-empty unless closing the stream",
1425 );
1426 }
1427
1428 if !payload.is_empty() {
1429 let Some(content_type) = content_type else {
1430 return StreamResponse::error(
1431 StreamErrorCode::MissingContentType,
1432 "append with a body must include content type",
1433 );
1434 };
1435 if content_type != stream.content_type {
1436 return StreamResponse::error_with_next_offset(
1437 StreamErrorCode::ContentTypeMismatch,
1438 format!(
1439 "append content type '{content_type}' does not match stream content type '{}'",
1440 stream.content_type
1441 ),
1442 stream.tail_offset,
1443 );
1444 }
1445 }
1446
1447 if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1448 return response;
1449 }
1450
1451 let offset = stream.tail_offset;
1452 let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1453 stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1454 if let Some(seq) = stream_seq {
1455 stream.last_stream_seq = Some(seq);
1456 }
1457 renew_stream_ttl(stream, now_ms);
1458 if close_after {
1459 stream.status = StreamStatus::Closed;
1460 }
1461 let closed = stream.status == StreamStatus::Closed;
1462 let next_offset = stream.tail_offset;
1463 let producer_ack = producer.clone();
1464 if let Some(producer) = producer {
1465 self.record_producer_success(
1466 stream_id.clone(),
1467 producer,
1468 ProducerAppendRecord {
1469 start_offset: offset,
1470 next_offset,
1471 closed,
1472 },
1473 vec![ProducerAppendRecord {
1474 start_offset: offset,
1475 next_offset,
1476 closed,
1477 }],
1478 );
1479 }
1480
1481 if payload.is_empty() {
1482 StreamResponse::Closed {
1483 next_offset,
1484 deduplicated: false,
1485 producer: producer_ack,
1486 }
1487 } else {
1488 let payload_store = self
1489 .payloads
1490 .get_mut(&stream_id)
1491 .expect("payload vector exists for stream metadata");
1492 let payload_start = payload_store.len();
1493 payload_store.extend_from_slice(payload);
1494 let payload_end = payload_store.len();
1495 self.hot_segments
1496 .get_mut(&stream_id)
1497 .map(|segments| {
1498 segments.push(HotPayloadSegment {
1499 start_offset: offset,
1500 end_offset: next_offset,
1501 payload_start,
1502 payload_end,
1503 })
1504 })
1505 .unwrap_or_else(|| {
1506 self.hot_segments.insert(
1507 stream_id.clone(),
1508 vec![HotPayloadSegment {
1509 start_offset: offset,
1510 end_offset: next_offset,
1511 payload_start,
1512 payload_end,
1513 }],
1514 );
1515 });
1516 self.refresh_hot_start_offset(&stream_id);
1517 self.message_records
1518 .entry(stream_id.clone())
1519 .or_default()
1520 .push(StreamMessageRecord {
1521 start_offset: offset,
1522 end_offset: next_offset,
1523 });
1524 StreamResponse::Appended {
1525 offset,
1526 next_offset,
1527 closed: close_after,
1528 deduplicated: false,
1529 producer: producer_ack,
1530 }
1531 }
1532 }
1533
1534 fn append_external(&mut self, input: AppendExternalInput<'_>) -> StreamResponse {
1535 let AppendExternalInput {
1536 stream_id,
1537 content_type,
1538 payload,
1539 close_after,
1540 stream_seq,
1541 producer,
1542 now_ms,
1543 } = input;
1544 if let Err(response) = validate_external_payload_ref(&payload) {
1545 return response;
1546 }
1547 if let Err(response) = self.validate_stream_scope(&stream_id) {
1548 return response;
1549 }
1550 if let Err(response) = validate_producer_request(producer.as_ref()) {
1551 return response;
1552 }
1553 let Some(_) = self.streams.get(&stream_id) else {
1554 return StreamResponse::error(
1555 StreamErrorCode::StreamNotFound,
1556 format!("stream '{stream_id}' does not exist"),
1557 );
1558 };
1559 if self.expire_stream_if_due(&stream_id, now_ms) {
1560 return StreamResponse::error(
1561 StreamErrorCode::StreamNotFound,
1562 format!("stream '{stream_id}' does not exist"),
1563 );
1564 }
1565 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1566 return StreamResponse::error(
1567 StreamErrorCode::StreamGone,
1568 format!("stream '{stream_id}' is gone"),
1569 );
1570 }
1571 let producer_decision = match self.evaluate_producer(&stream_id, producer.as_ref()) {
1572 Ok(decision) => decision,
1573 Err(response) => return response,
1574 };
1575 if let ProducerDecision::Duplicate {
1576 offset,
1577 next_offset,
1578 closed,
1579 producer,
1580 ..
1581 } = producer_decision
1582 {
1583 return StreamResponse::Appended {
1584 offset,
1585 next_offset,
1586 closed,
1587 deduplicated: true,
1588 producer: Some(producer),
1589 };
1590 }
1591
1592 let Some(stream) = self.streams.get(&stream_id) else {
1593 unreachable!("stream existence checked before producer evaluation");
1594 };
1595 if stream.status == StreamStatus::Closed {
1596 return StreamResponse::error_with_next_offset(
1597 StreamErrorCode::StreamClosed,
1598 format!("stream '{stream_id}' is closed"),
1599 stream.tail_offset,
1600 );
1601 }
1602 let Some(content_type) = content_type else {
1603 return StreamResponse::error(
1604 StreamErrorCode::MissingContentType,
1605 "append with a body must include content type",
1606 );
1607 };
1608 if content_type != stream.content_type {
1609 return StreamResponse::error_with_next_offset(
1610 StreamErrorCode::ContentTypeMismatch,
1611 format!(
1612 "append content type '{content_type}' does not match stream content type '{}'",
1613 stream.content_type
1614 ),
1615 stream.tail_offset,
1616 );
1617 }
1618 if let Err(response) = check_stream_seq(stream, stream_seq.as_deref()) {
1619 return response;
1620 }
1621 let offset = stream.tail_offset;
1622 let next_offset = offset.saturating_add(payload.payload_len);
1623 let stream = self
1624 .streams
1625 .get_mut(&stream_id)
1626 .expect("stream existence checked before external append mutation");
1627 stream.tail_offset = next_offset;
1628 if let Some(seq) = stream_seq {
1629 stream.last_stream_seq = Some(seq);
1630 }
1631 renew_stream_ttl(stream, now_ms);
1632 if close_after {
1633 stream.status = StreamStatus::Closed;
1634 }
1635 let closed = stream.status == StreamStatus::Closed;
1636 let producer_ack = producer.clone();
1637 if let Some(producer) = producer {
1638 self.record_producer_success(
1639 stream_id.clone(),
1640 producer,
1641 ProducerAppendRecord {
1642 start_offset: offset,
1643 next_offset,
1644 closed,
1645 },
1646 vec![ProducerAppendRecord {
1647 start_offset: offset,
1648 next_offset,
1649 closed,
1650 }],
1651 );
1652 }
1653 self.external_segments
1654 .entry(stream_id.clone())
1655 .or_default()
1656 .push(ObjectPayloadRef {
1657 start_offset: offset,
1658 end_offset: next_offset,
1659 s3_path: payload.s3_path,
1660 object_size: payload.object_size,
1661 });
1662 self.message_records
1663 .entry(stream_id.clone())
1664 .or_default()
1665 .push(StreamMessageRecord {
1666 start_offset: offset,
1667 end_offset: next_offset,
1668 });
1669 StreamResponse::Appended {
1670 offset,
1671 next_offset,
1672 closed: close_after,
1673 deduplicated: false,
1674 producer: producer_ack,
1675 }
1676 }
1677
1678 pub fn append_batch_borrowed(
1679 &mut self,
1680 stream_id: BucketStreamId,
1681 content_type: Option<&str>,
1682 payloads: &[&[u8]],
1683 producer: Option<ProducerRequest>,
1684 now_ms: u64,
1685 ) -> Result<StreamBatchAppend, StreamResponse> {
1686 if payloads.is_empty() {
1687 return Err(StreamResponse::error(
1688 StreamErrorCode::EmptyAppend,
1689 "append batch must contain at least one payload",
1690 ));
1691 }
1692 self.validate_stream_scope(&stream_id)?;
1693 validate_producer_request(producer.as_ref())?;
1694 if self.expire_stream_if_due(&stream_id, now_ms) {
1695 return Err(StreamResponse::error(
1696 StreamErrorCode::StreamNotFound,
1697 format!("stream '{stream_id}' does not exist"),
1698 ));
1699 }
1700 if self.streams.get(&stream_id).is_some_and(is_soft_deleted) {
1701 return Err(StreamResponse::error(
1702 StreamErrorCode::StreamGone,
1703 format!("stream '{stream_id}' is gone"),
1704 ));
1705 }
1706 let producer_decision = self.evaluate_producer(&stream_id, producer.as_ref())?;
1707 if let ProducerDecision::Duplicate { items, .. } = producer_decision {
1708 return Ok(StreamBatchAppend {
1709 items: items
1710 .into_iter()
1711 .map(|item| StreamBatchAppendItem {
1712 offset: item.start_offset,
1713 next_offset: item.next_offset,
1714 closed: item.closed,
1715 deduplicated: true,
1716 })
1717 .collect(),
1718 deduplicated: true,
1719 });
1720 }
1721
1722 let Some(stream) = self.streams.get_mut(&stream_id) else {
1723 return Err(StreamResponse::error(
1724 StreamErrorCode::StreamNotFound,
1725 format!("stream '{stream_id}' does not exist"),
1726 ));
1727 };
1728 if stream.status == StreamStatus::Closed {
1729 return Err(StreamResponse::error_with_next_offset(
1730 StreamErrorCode::StreamClosed,
1731 format!("stream '{stream_id}' is closed"),
1732 stream.tail_offset,
1733 ));
1734 }
1735 let Some(content_type) = content_type else {
1736 return Err(StreamResponse::error(
1737 StreamErrorCode::MissingContentType,
1738 "append batch must include content type",
1739 ));
1740 };
1741 if content_type != stream.content_type {
1742 return Err(StreamResponse::error_with_next_offset(
1743 StreamErrorCode::ContentTypeMismatch,
1744 format!(
1745 "append content type '{content_type}' does not match stream content type '{}'",
1746 stream.content_type
1747 ),
1748 stream.tail_offset,
1749 ));
1750 }
1751 if payloads.iter().any(|payload| payload.is_empty()) {
1752 return Err(StreamResponse::error(
1753 StreamErrorCode::EmptyAppend,
1754 "append batch payloads must be non-empty",
1755 ));
1756 }
1757
1758 let mut items = Vec::with_capacity(payloads.len());
1759 for payload in payloads {
1760 let offset = stream.tail_offset;
1761 let payload_len = u64::try_from(payload.len()).expect("payload len fits u64");
1762 stream.tail_offset = stream.tail_offset.saturating_add(payload_len);
1763 items.push(ProducerAppendRecord {
1764 start_offset: offset,
1765 next_offset: stream.tail_offset,
1766 closed: false,
1767 });
1768 }
1769 let last = items
1770 .last()
1771 .expect("payloads checked non-empty before append")
1772 .clone();
1773 renew_stream_ttl(stream, now_ms);
1774 if let Some(producer) = producer {
1775 self.record_producer_success(stream_id.clone(), producer, last.clone(), items.clone());
1776 }
1777 let payload_store = self
1778 .payloads
1779 .get_mut(&stream_id)
1780 .expect("payload vector exists for stream metadata");
1781 let hot_segments = self.hot_segments.entry(stream_id.clone()).or_default();
1782 for (item, payload) in items.iter().zip(payloads.iter()) {
1783 let payload_start = payload_store.len();
1784 payload_store.extend_from_slice(payload);
1785 let payload_end = payload_store.len();
1786 hot_segments.push(HotPayloadSegment {
1787 start_offset: item.start_offset,
1788 end_offset: item.next_offset,
1789 payload_start,
1790 payload_end,
1791 });
1792 }
1793 self.refresh_hot_start_offset(&stream_id);
1794 self.message_records
1795 .entry(stream_id.clone())
1796 .or_default()
1797 .extend(items.iter().map(|item| StreamMessageRecord {
1798 start_offset: item.start_offset,
1799 end_offset: item.next_offset,
1800 }));
1801 Ok(StreamBatchAppend {
1802 items: items
1803 .into_iter()
1804 .map(|item| StreamBatchAppendItem {
1805 offset: item.start_offset,
1806 next_offset: item.next_offset,
1807 closed: item.closed,
1808 deduplicated: false,
1809 })
1810 .collect(),
1811 deduplicated: false,
1812 })
1813 }
1814
1815 fn close(
1816 &mut self,
1817 stream_id: BucketStreamId,
1818 stream_seq: Option<String>,
1819 producer: Option<ProducerRequest>,
1820 now_ms: u64,
1821 ) -> StreamResponse {
1822 self.append_borrowed(AppendStreamInput {
1823 stream_id,
1824 content_type: None,
1825 payload: &[],
1826 close_after: true,
1827 stream_seq,
1828 producer,
1829 now_ms,
1830 })
1831 }
1832
1833 fn delete_stream(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
1834 if let Err(response) = self.validate_stream_scope(stream_id) {
1835 return response;
1836 }
1837 let Some(stream) = self.streams.get_mut(stream_id) else {
1838 return StreamResponse::error(
1839 StreamErrorCode::StreamNotFound,
1840 format!("stream '{stream_id}' does not exist"),
1841 );
1842 };
1843 if is_soft_deleted(stream) {
1844 return StreamResponse::error(
1845 StreamErrorCode::StreamGone,
1846 format!("stream '{stream_id}' is gone"),
1847 );
1848 }
1849 if stream.fork_ref_count > 0 {
1850 stream.status = StreamStatus::SoftDeleted;
1851 return StreamResponse::Deleted {
1852 hard_deleted: false,
1853 parent_to_release: None,
1854 };
1855 }
1856 let parent_to_release = stream.forked_from.clone();
1857 self.remove_stream_state(stream_id);
1858 StreamResponse::Deleted {
1859 hard_deleted: true,
1860 parent_to_release,
1861 }
1862 }
1863
1864 fn add_fork_ref(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> StreamResponse {
1865 if let Err(response) = self.validate_stream_scope(stream_id) {
1866 return response;
1867 }
1868 if self.expire_stream_if_due(stream_id, now_ms) {
1869 return StreamResponse::error(
1870 StreamErrorCode::StreamNotFound,
1871 format!("stream '{stream_id}' does not exist"),
1872 );
1873 }
1874 let Some(stream) = self.streams.get_mut(stream_id) else {
1875 return StreamResponse::error(
1876 StreamErrorCode::StreamNotFound,
1877 format!("stream '{stream_id}' does not exist"),
1878 );
1879 };
1880 if is_soft_deleted(stream) {
1881 return StreamResponse::error(
1882 StreamErrorCode::StreamGone,
1883 format!("stream '{stream_id}' is gone"),
1884 );
1885 }
1886 stream.fork_ref_count = stream.fork_ref_count.saturating_add(1);
1887 StreamResponse::ForkRefAdded {
1888 fork_ref_count: stream.fork_ref_count,
1889 }
1890 }
1891
1892 fn release_fork_ref(&mut self, stream_id: &BucketStreamId) -> StreamResponse {
1893 if let Err(response) = self.validate_stream_scope(stream_id) {
1894 return response;
1895 }
1896 let Some(stream) = self.streams.get_mut(stream_id) else {
1897 return StreamResponse::ForkRefReleased {
1898 hard_deleted: false,
1899 fork_ref_count: 0,
1900 parent_to_release: None,
1901 };
1902 };
1903 if stream.fork_ref_count == 0 {
1904 return StreamResponse::error(
1905 StreamErrorCode::InvalidFork,
1906 format!("stream '{stream_id}' has no fork reference to release"),
1907 );
1908 }
1909 stream.fork_ref_count -= 1;
1910 if stream.fork_ref_count == 0 && is_soft_deleted(stream) {
1911 let parent_to_release = stream.forked_from.clone();
1912 self.remove_stream_state(stream_id);
1913 return StreamResponse::ForkRefReleased {
1914 hard_deleted: true,
1915 fork_ref_count: 0,
1916 parent_to_release,
1917 };
1918 }
1919 StreamResponse::ForkRefReleased {
1920 hard_deleted: false,
1921 fork_ref_count: stream.fork_ref_count,
1922 parent_to_release: None,
1923 }
1924 }
1925
1926 fn touch_stream_access(
1927 &mut self,
1928 stream_id: &BucketStreamId,
1929 now_ms: u64,
1930 renew_ttl: bool,
1931 ) -> StreamResponse {
1932 if let Err(response) = self.validate_stream_scope(stream_id) {
1933 return response;
1934 }
1935 let Some(stream) = self.streams.get(stream_id) else {
1936 return StreamResponse::error(
1937 StreamErrorCode::StreamNotFound,
1938 format!("stream '{stream_id}' does not exist"),
1939 );
1940 };
1941 if is_soft_deleted(stream) {
1942 return StreamResponse::error(
1943 StreamErrorCode::StreamGone,
1944 format!("stream '{stream_id}' is gone"),
1945 );
1946 }
1947 if stream_is_expired(stream, now_ms) {
1948 self.remove_stream_state(stream_id);
1949 return StreamResponse::Accessed {
1950 changed: true,
1951 expired: true,
1952 };
1953 }
1954 let changed = if renew_ttl && stream.stream_ttl_seconds.is_some() {
1955 let stream = self
1956 .streams
1957 .get_mut(stream_id)
1958 .expect("stream existence checked before TTL renewal");
1959 let previous = stream.last_ttl_touch_at_ms;
1960 renew_stream_ttl(stream, now_ms);
1961 stream.last_ttl_touch_at_ms != previous
1962 } else {
1963 false
1964 };
1965 StreamResponse::Accessed {
1966 changed,
1967 expired: false,
1968 }
1969 }
1970
1971 fn expire_stream_if_due(&mut self, stream_id: &BucketStreamId, now_ms: u64) -> bool {
1972 if self
1973 .streams
1974 .get(stream_id)
1975 .is_some_and(|stream| stream_is_expired(stream, now_ms))
1976 {
1977 self.remove_stream_state(stream_id);
1978 return true;
1979 }
1980 false
1981 }
1982
1983 fn remove_stream_state(&mut self, stream_id: &BucketStreamId) -> bool {
1984 if self.streams.remove(stream_id).is_some() {
1985 self.payloads.remove(stream_id);
1986 self.hot_segments.remove(stream_id);
1987 self.hot_start_offsets.remove(stream_id);
1988 self.cold_chunks.remove(stream_id);
1989 self.external_segments.remove(stream_id);
1990 self.message_records.remove(stream_id);
1991 self.visible_snapshots.remove(stream_id);
1992 self.producers.remove(stream_id);
1993 true
1994 } else {
1995 false
1996 }
1997 }
1998
1999 fn validate_stream_scope(&self, stream_id: &BucketStreamId) -> Result<(), StreamResponse> {
2000 if let Err(message) = validate_bucket_id(&stream_id.bucket_id) {
2001 return Err(StreamResponse::error(
2002 StreamErrorCode::InvalidBucketId,
2003 message,
2004 ));
2005 }
2006 if let Err(message) = validate_stream_id(stream_id) {
2007 return Err(StreamResponse::error(
2008 StreamErrorCode::InvalidStreamId,
2009 message,
2010 ));
2011 }
2012 if !self.buckets.contains(&stream_id.bucket_id) {
2013 return Err(StreamResponse::error(
2014 StreamErrorCode::BucketNotFound,
2015 format!("bucket '{}' does not exist", stream_id.bucket_id),
2016 ));
2017 }
2018 Ok(())
2019 }
2020
2021 fn earliest_retained_offset(&self, stream_id: &BucketStreamId) -> u64 {
2022 self.visible_snapshots
2023 .get(stream_id)
2024 .map(|snapshot| snapshot.offset)
2025 .unwrap_or(0)
2026 }
2027
2028 fn snapshot_offset_aligned(
2029 &self,
2030 stream_id: &BucketStreamId,
2031 snapshot_offset: u64,
2032 retained_offset: u64,
2033 ) -> bool {
2034 snapshot_offset == retained_offset
2035 || self.message_records.get(stream_id).is_some_and(|records| {
2036 records
2037 .iter()
2038 .any(|record| record.end_offset == snapshot_offset)
2039 })
2040 }
2041
2042 fn compact_retained_prefix(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2043 if let Some(records) = self.message_records.get_mut(stream_id) {
2044 records.retain(|record| record.end_offset > retained_offset);
2045 if records.is_empty() {
2046 self.message_records.remove(stream_id);
2047 }
2048 }
2049 if let Some(chunks) = self.cold_chunks.get_mut(stream_id) {
2050 chunks.retain(|chunk| chunk.end_offset > retained_offset);
2051 if chunks.is_empty() {
2052 self.cold_chunks.remove(stream_id);
2053 }
2054 }
2055 if let Some(objects) = self.external_segments.get_mut(stream_id) {
2056 objects.retain(|object| object.end_offset > retained_offset);
2057 if objects.is_empty() {
2058 self.external_segments.remove(stream_id);
2059 }
2060 }
2061
2062 self.discard_hot_prefix_before(stream_id, retained_offset);
2063 }
2064
2065 fn refresh_hot_start_offset(&mut self, stream_id: &BucketStreamId) {
2066 let Some(hot_start_offset) = self
2067 .hot_segments
2068 .get(stream_id)
2069 .and_then(|segments| segments.iter().map(|segment| segment.start_offset).min())
2070 else {
2071 self.hot_start_offsets.remove(stream_id);
2072 return;
2073 };
2074 if hot_start_offset == 0 {
2075 self.hot_start_offsets.remove(stream_id);
2076 } else {
2077 self.hot_start_offsets
2078 .insert(stream_id.clone(), hot_start_offset);
2079 }
2080 }
2081
2082 fn remove_drained_hot_range(
2083 &mut self,
2084 stream_id: &BucketStreamId,
2085 segment_index: usize,
2086 new_start_offset: u64,
2087 drain_start: usize,
2088 drained_len: usize,
2089 ) {
2090 let Some(segments) = self.hot_segments.get_mut(stream_id) else {
2091 self.hot_start_offsets.remove(stream_id);
2092 return;
2093 };
2094 if segment_index >= segments.len() {
2095 self.refresh_hot_start_offset(stream_id);
2096 return;
2097 }
2098 let drain_end = drain_start + drained_len;
2099 let mut updated_segments = Vec::with_capacity(segments.len());
2100 for (index, mut segment) in segments.drain(..).enumerate() {
2101 if index < segment_index || segment.payload_end <= drain_start {
2102 updated_segments.push(segment);
2103 continue;
2104 }
2105 if segment.payload_start >= drain_end {
2106 segment.payload_start -= drained_len;
2107 segment.payload_end -= drained_len;
2108 updated_segments.push(segment);
2109 continue;
2110 }
2111 if segment.payload_end <= drain_end {
2112 continue;
2113 }
2114 segment.start_offset = new_start_offset;
2115 segment.payload_start = drain_start;
2116 segment.payload_end -= drained_len;
2117 updated_segments.push(segment);
2118 }
2119 *segments = updated_segments;
2120 if segments.is_empty() {
2121 self.hot_segments.remove(stream_id);
2122 }
2123 self.refresh_hot_start_offset(stream_id);
2124 }
2125
2126 fn discard_hot_prefix_before(&mut self, stream_id: &BucketStreamId, retained_offset: u64) {
2127 while let Some(segment_index) = self
2128 .hot_segments(stream_id)
2129 .iter()
2130 .position(|segment| segment.start_offset < retained_offset)
2131 {
2132 let segment = self.hot_segments(stream_id)[segment_index].clone();
2133 let new_start_offset = retained_offset.min(segment.end_offset);
2134 let drained_len = usize::try_from(new_start_offset - segment.start_offset)
2135 .expect("drain len fits usize");
2136 if drained_len == 0 {
2137 break;
2138 }
2139 let drain_start = segment.payload_start;
2140 let drain_end = drain_start + drained_len;
2141 self.payloads
2142 .get_mut(stream_id)
2143 .expect("payload vector exists for stream metadata")
2144 .drain(drain_start..drain_end);
2145 self.remove_drained_hot_range(
2146 stream_id,
2147 segment_index,
2148 new_start_offset,
2149 drain_start,
2150 drained_len,
2151 );
2152 }
2153 self.refresh_hot_start_offset(stream_id);
2154 }
2155
2156 fn producer_snapshot(&self, stream_id: &BucketStreamId) -> Vec<ProducerSnapshot> {
2157 let mut producer_states = self
2158 .producers
2159 .get(stream_id)
2160 .into_iter()
2161 .flat_map(|states| states.iter())
2162 .map(|(producer_id, state)| ProducerSnapshot {
2163 producer_id: producer_id.clone(),
2164 producer_epoch: state.producer_epoch,
2165 producer_seq: state.producer_seq,
2166 last_start_offset: state.last_start_offset,
2167 last_next_offset: state.last_next_offset,
2168 last_closed: state.last_closed,
2169 last_items: state.last_items.clone(),
2170 })
2171 .collect::<Vec<_>>();
2172 producer_states.sort_by(|left, right| left.producer_id.cmp(&right.producer_id));
2173 producer_states
2174 }
2175
2176 fn evaluate_producer(
2177 &self,
2178 stream_id: &BucketStreamId,
2179 producer: Option<&ProducerRequest>,
2180 ) -> Result<ProducerDecision, StreamResponse> {
2181 let Some(producer) = producer else {
2182 return Ok(ProducerDecision::Accept);
2183 };
2184 let Some(states) = self.producers.get(stream_id) else {
2185 return Ok(ProducerDecision::Accept);
2186 };
2187 let Some(state) = states.get(&producer.producer_id) else {
2188 if producer.producer_seq == 0 {
2189 return Ok(ProducerDecision::Accept);
2190 }
2191 return Err(StreamResponse::error(
2192 StreamErrorCode::ProducerSeqConflict,
2193 format!(
2194 "producer '{}' expected sequence 0, received {}",
2195 producer.producer_id, producer.producer_seq
2196 ),
2197 ));
2198 };
2199
2200 if producer.producer_epoch < state.producer_epoch {
2201 return Err(StreamResponse::error(
2202 StreamErrorCode::ProducerEpochStale,
2203 format!(
2204 "producer '{}' epoch {} is stale; current epoch is {}",
2205 producer.producer_id, producer.producer_epoch, state.producer_epoch
2206 ),
2207 ));
2208 }
2209 if producer.producer_epoch > state.producer_epoch {
2210 if producer.producer_seq == 0 {
2211 return Ok(ProducerDecision::Accept);
2212 }
2213 return Err(StreamResponse::error(
2214 StreamErrorCode::InvalidProducer,
2215 format!(
2216 "producer '{}' new epoch {} must start at sequence 0",
2217 producer.producer_id, producer.producer_epoch
2218 ),
2219 ));
2220 }
2221
2222 if producer.producer_seq <= state.producer_seq {
2223 return Ok(ProducerDecision::Duplicate {
2224 offset: state.last_start_offset,
2225 next_offset: state.last_next_offset,
2226 closed: state.last_closed,
2227 producer: ProducerRequest {
2228 producer_id: producer.producer_id.clone(),
2229 producer_epoch: state.producer_epoch,
2230 producer_seq: state.producer_seq,
2231 },
2232 items: state.last_items.clone(),
2233 });
2234 }
2235 if producer.producer_seq == state.producer_seq + 1 {
2236 return Ok(ProducerDecision::Accept);
2237 }
2238 Err(StreamResponse::error(
2239 StreamErrorCode::ProducerSeqConflict,
2240 format!(
2241 "producer '{}' expected sequence {}, received {}",
2242 producer.producer_id,
2243 state.producer_seq + 1,
2244 producer.producer_seq
2245 ),
2246 ))
2247 }
2248
2249 fn record_producer_success(
2250 &mut self,
2251 stream_id: BucketStreamId,
2252 producer: ProducerRequest,
2253 last: ProducerAppendRecord,
2254 last_items: Vec<ProducerAppendRecord>,
2255 ) {
2256 self.producers.entry(stream_id).or_default().insert(
2257 producer.producer_id,
2258 ProducerState {
2259 producer_epoch: producer.producer_epoch,
2260 producer_seq: producer.producer_seq,
2261 last_start_offset: last.start_offset,
2262 last_next_offset: last.next_offset,
2263 last_closed: last.closed,
2264 last_items,
2265 },
2266 );
2267 }
2268}
2269
2270#[derive(Debug)]
2271struct CreateStreamInput {
2272 stream_id: BucketStreamId,
2273 content_type: String,
2274 initial_payload: Vec<u8>,
2275 close_after: bool,
2276 stream_seq: Option<String>,
2277 producer: Option<ProducerRequest>,
2278 stream_ttl_seconds: Option<u64>,
2279 stream_expires_at_ms: Option<u64>,
2280 forked_from: Option<BucketStreamId>,
2281 fork_offset: Option<u64>,
2282 now_ms: u64,
2283}
2284
2285#[derive(Debug)]
2286struct CreateExternalStreamInput {
2287 stream_id: BucketStreamId,
2288 content_type: String,
2289 initial_payload: ExternalPayloadRef,
2290 close_after: bool,
2291 stream_seq: Option<String>,
2292 producer: Option<ProducerRequest>,
2293 stream_ttl_seconds: Option<u64>,
2294 stream_expires_at_ms: Option<u64>,
2295 forked_from: Option<BucketStreamId>,
2296 fork_offset: Option<u64>,
2297 now_ms: u64,
2298}
2299
2300#[derive(Debug, Clone, PartialEq, Eq)]
2301enum ProducerDecision {
2302 Accept,
2303 Duplicate {
2304 offset: u64,
2305 next_offset: u64,
2306 closed: bool,
2307 producer: ProducerRequest,
2308 items: Vec<ProducerAppendRecord>,
2309 },
2310}
2311
2312impl CreateStreamInput {
2313 fn initial_len(&self) -> u64 {
2314 u64::try_from(self.initial_payload.len()).expect("payload len fits u64")
2315 }
2316}
2317
2318fn status_from_closed(closed: bool) -> StreamStatus {
2319 if closed {
2320 StreamStatus::Closed
2321 } else {
2322 StreamStatus::Open
2323 }
2324}
2325
2326fn is_soft_deleted(stream: &StreamMetadata) -> bool {
2327 stream.status == StreamStatus::SoftDeleted
2328}
2329
2330fn validate_retention(
2331 stream_ttl_seconds: Option<u64>,
2332 stream_expires_at_ms: Option<u64>,
2333) -> Result<(), StreamResponse> {
2334 if stream_ttl_seconds.is_some() && stream_expires_at_ms.is_some() {
2335 return Err(StreamResponse::error(
2336 StreamErrorCode::InvalidRetention,
2337 "stream ttl and expires-at cannot both be set",
2338 ));
2339 }
2340 if let Some(ttl_seconds) = stream_ttl_seconds
2341 && ttl_seconds.checked_mul(1000).is_none()
2342 {
2343 return Err(StreamResponse::error(
2344 StreamErrorCode::InvalidRetention,
2345 "stream ttl overflows millisecond range",
2346 ));
2347 }
2348 Ok(())
2349}
2350
2351fn stream_expiry_at_ms(stream: &StreamMetadata) -> Option<u64> {
2352 if let Some(expires_at_ms) = stream.stream_expires_at_ms {
2353 return Some(expires_at_ms);
2354 }
2355 stream.stream_ttl_seconds.map(|ttl_seconds| {
2356 stream
2357 .last_ttl_touch_at_ms
2358 .saturating_add(ttl_seconds.saturating_mul(1000))
2359 })
2360}
2361
2362fn stream_is_expired(stream: &StreamMetadata, now_ms: u64) -> bool {
2363 stream_expiry_at_ms(stream).is_some_and(|expires_at_ms| now_ms >= expires_at_ms)
2364}
2365
2366fn renew_stream_ttl(stream: &mut StreamMetadata, now_ms: u64) {
2367 if stream.stream_ttl_seconds.is_some() && stream.stream_expires_at_ms.is_none() {
2368 stream.last_ttl_touch_at_ms = now_ms;
2369 }
2370}
2371
2372fn check_stream_seq(stream: &StreamMetadata, incoming: Option<&str>) -> Result<(), StreamResponse> {
2373 let Some(incoming) = incoming else {
2374 return Ok(());
2375 };
2376 if let Some(last) = stream.last_stream_seq.as_deref()
2377 && incoming <= last
2378 {
2379 return Err(StreamResponse::error_with_next_offset(
2380 StreamErrorCode::StreamSeqConflict,
2381 format!("stream sequence '{incoming}' is not greater than last sequence '{last}'"),
2382 stream.tail_offset,
2383 ));
2384 }
2385 Ok(())
2386}
2387
2388fn validate_producer_request(producer: Option<&ProducerRequest>) -> Result<(), StreamResponse> {
2389 let Some(producer) = producer else {
2390 return Ok(());
2391 };
2392 if producer.producer_id.trim().is_empty() {
2393 return Err(StreamResponse::error(
2394 StreamErrorCode::InvalidProducer,
2395 "producer id must not be empty",
2396 ));
2397 }
2398 const MAX_JS_SAFE_INTEGER: u64 = 9_007_199_254_740_991;
2399 if producer.producer_epoch > MAX_JS_SAFE_INTEGER {
2400 return Err(StreamResponse::error(
2401 StreamErrorCode::InvalidProducer,
2402 format!(
2403 "producer epoch {} exceeds maximum {}",
2404 producer.producer_epoch, MAX_JS_SAFE_INTEGER
2405 ),
2406 ));
2407 }
2408 if producer.producer_seq > MAX_JS_SAFE_INTEGER {
2409 return Err(StreamResponse::error(
2410 StreamErrorCode::InvalidProducer,
2411 format!(
2412 "producer sequence {} exceeds maximum {}",
2413 producer.producer_seq, MAX_JS_SAFE_INTEGER
2414 ),
2415 ));
2416 }
2417 Ok(())
2418}
2419
2420fn validate_external_payload_ref(payload: &ExternalPayloadRef) -> Result<(), StreamResponse> {
2421 if payload.s3_path.trim().is_empty() {
2422 return Err(StreamResponse::error(
2423 StreamErrorCode::InvalidColdFlush,
2424 "external payload S3 path must not be empty",
2425 ));
2426 }
2427 if payload.payload_len == 0 {
2428 return Err(StreamResponse::error(
2429 StreamErrorCode::EmptyAppend,
2430 "external payload length must be greater than zero",
2431 ));
2432 }
2433 if payload.object_size < payload.payload_len {
2434 return Err(StreamResponse::error(
2435 StreamErrorCode::InvalidColdFlush,
2436 "external payload object size must cover payload length",
2437 ));
2438 }
2439 Ok(())
2440}
2441
2442fn restore_producer_states(
2443 stream_id: &BucketStreamId,
2444 snapshots: Vec<ProducerSnapshot>,
2445) -> Result<HashMap<String, ProducerState>, StreamSnapshotError> {
2446 let mut states = HashMap::with_capacity(snapshots.len());
2447 for snapshot in snapshots {
2448 if states
2449 .insert(
2450 snapshot.producer_id.clone(),
2451 ProducerState {
2452 producer_epoch: snapshot.producer_epoch,
2453 producer_seq: snapshot.producer_seq,
2454 last_start_offset: snapshot.last_start_offset,
2455 last_next_offset: snapshot.last_next_offset,
2456 last_closed: snapshot.last_closed,
2457 last_items: snapshot.last_items,
2458 },
2459 )
2460 .is_some()
2461 {
2462 return Err(StreamSnapshotError::DuplicateProducer {
2463 stream_id: stream_id.clone(),
2464 producer_id: snapshot.producer_id,
2465 });
2466 }
2467 }
2468 Ok(states)
2469}
2470
2471fn valid_cold_chunk_ref(chunk: &ColdChunkRef) -> bool {
2472 chunk.end_offset > chunk.start_offset
2473 && !chunk.s3_path.trim().is_empty()
2474 && chunk.object_size >= chunk.end_offset - chunk.start_offset
2475}
2476
2477fn valid_object_payload_ref(object: &ObjectPayloadRef) -> bool {
2478 object.end_offset > object.start_offset
2479 && !object.s3_path.trim().is_empty()
2480 && object.object_size >= object.end_offset - object.start_offset
2481}
2482
2483fn hot_segments_match_payload(segments: &[HotPayloadSegment], payload_len: usize) -> bool {
2484 let mut expected_payload_start = 0;
2485 for segment in segments {
2486 if segment.end_offset <= segment.start_offset
2487 || segment.payload_start != expected_payload_start
2488 || segment.payload_end <= segment.payload_start
2489 || segment.payload_end > payload_len
2490 {
2491 return false;
2492 }
2493 let Ok(logical_len) = usize::try_from(segment.end_offset - segment.start_offset) else {
2494 return false;
2495 };
2496 if logical_len != segment.payload_end - segment.payload_start {
2497 return false;
2498 }
2499 expected_payload_start = segment.payload_end;
2500 }
2501 expected_payload_start == payload_len
2502}
2503
2504fn payload_sources_cover_retained_suffix(
2505 cold_chunks: &[ColdChunkRef],
2506 external_segments: &[ObjectPayloadRef],
2507 hot_segments: &[HotPayloadSegment],
2508 retained_offset: u64,
2509 tail_offset: u64,
2510) -> bool {
2511 if tail_offset < retained_offset {
2512 return false;
2513 }
2514 let mut ranges =
2515 Vec::with_capacity(cold_chunks.len() + external_segments.len() + hot_segments.len());
2516 for chunk in cold_chunks {
2517 if !valid_cold_chunk_ref(chunk) {
2518 return false;
2519 }
2520 ranges.push((chunk.start_offset, chunk.end_offset));
2521 }
2522 for object in external_segments {
2523 if !valid_object_payload_ref(object) {
2524 return false;
2525 }
2526 ranges.push((object.start_offset, object.end_offset));
2527 }
2528 for segment in hot_segments {
2529 if segment.end_offset <= segment.start_offset {
2530 return false;
2531 }
2532 ranges.push((segment.start_offset, segment.end_offset));
2533 }
2534 ranges.sort_unstable();
2535
2536 let mut expected_start = retained_offset;
2537 for (start_offset, end_offset) in ranges {
2538 if end_offset <= expected_start {
2539 continue;
2540 }
2541 if start_offset > expected_start {
2542 return false;
2543 }
2544 expected_start = end_offset;
2545 if expected_start >= tail_offset {
2546 return true;
2547 }
2548 }
2549 expected_start == tail_offset
2550}
2551
2552fn segments_cover_range(
2553 segments: &[(u64, StreamReadSegment)],
2554 offset: u64,
2555 next_offset: u64,
2556) -> bool {
2557 if next_offset < offset {
2558 return false;
2559 }
2560 let mut expected_start = offset;
2561 for (segment_start, segment) in segments {
2562 let Some(segment_end) = read_segment_end(*segment_start, segment) else {
2563 return false;
2564 };
2565 if segment_end <= expected_start {
2566 continue;
2567 }
2568 if *segment_start > expected_start {
2569 return false;
2570 }
2571 expected_start = segment_end;
2572 if expected_start >= next_offset {
2573 return true;
2574 }
2575 }
2576 expected_start == next_offset
2577}
2578
2579fn read_segment_end(segment_start: u64, segment: &StreamReadSegment) -> Option<u64> {
2580 match segment {
2581 StreamReadSegment::Object(object) => {
2582 if object.len == 0
2583 || object.read_start_offset != segment_start
2584 || object.read_start_offset < object.object.start_offset
2585 {
2586 return None;
2587 }
2588 let len = u64::try_from(object.len).ok()?;
2589 let segment_end = object.read_start_offset.checked_add(len)?;
2590 if segment_end > object.object.end_offset {
2591 return None;
2592 }
2593 Some(segment_end)
2594 }
2595 StreamReadSegment::Hot(payload) => {
2596 if payload.is_empty() {
2597 return None;
2598 }
2599 let len = u64::try_from(payload.len()).ok()?;
2600 segment_start.checked_add(len)
2601 }
2602 }
2603}
2604
2605fn message_records_cover_retained_suffix(
2606 records: &[StreamMessageRecord],
2607 retained_offset: u64,
2608 tail_offset: u64,
2609) -> bool {
2610 let mut expected_start = retained_offset;
2611 for record in records {
2612 if record.start_offset != expected_start || record.end_offset <= record.start_offset {
2613 return false;
2614 }
2615 expected_start = record.end_offset;
2616 }
2617 expected_start == tail_offset
2618}
2619
2620fn compare_stream_ids(left: &BucketStreamId, right: &BucketStreamId) -> std::cmp::Ordering {
2621 left.bucket_id
2622 .cmp(&right.bucket_id)
2623 .then_with(|| left.stream_id.cmp(&right.stream_id))
2624}
2625
2626#[cfg(test)]
2627mod tests;