1use std::collections::{HashMap, HashSet};
2
3use bytes::Bytes;
4use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardId, ShardPlacement};
5use ursula_stream::{
6 AppendStreamInput, ProducerRequest, StreamCommand, StreamErrorCode, StreamMessageRecord,
7 StreamReadPlan, StreamReadSegment, StreamResponse, StreamSnapshot, StreamStateMachine,
8};
9
10use crate::cold_store::{ColdStoreHandle, DEFAULT_CONTENT_TYPE};
11use crate::command::{GroupSnapshot, GroupWriteCommand};
12use crate::engine::{
13 GroupAppendBatchFuture, GroupAppendBatchResponse, GroupAppendFuture,
14 GroupBootstrapStreamFuture, GroupCloseStreamFuture, GroupColdHotBacklogFuture,
15 GroupCreateStreamFuture, GroupDeleteSnapshotFuture, GroupDeleteStreamFuture, GroupEngine,
16 GroupEngineCreateFuture, GroupEngineError, GroupEngineFactory, GroupEngineMetrics,
17 GroupFlushColdFuture, GroupForkRefFuture, GroupHeadStreamFuture, GroupInstallSnapshotFuture,
18 GroupPlanColdFlushFuture, GroupPlanNextColdFlushBatchFuture, GroupPlanNextColdFlushFuture,
19 GroupPublishSnapshotFuture, GroupReadSnapshotFuture, GroupReadStreamFuture,
20 GroupReadStreamPartsFuture, GroupSnapshotFuture, GroupTouchStreamAccessFuture,
21 GroupWriteResponse,
22};
23use crate::request::{
24 AppendBatchRequest, AppendExternalRequest, AppendRequest, AppendResponse,
25 BootstrapStreamRequest, BootstrapStreamResponse, BootstrapUpdate, CloseStreamRequest,
26 CloseStreamResponse, ColdHotBacklog, ColdWriteAdmission, CreateStreamExternalRequest,
27 CreateStreamRequest, CreateStreamResponse, DeleteSnapshotRequest, DeleteStreamRequest,
28 DeleteStreamResponse, FlushColdRequest, FlushColdResponse, ForkRefResponse,
29 GroupReadStreamParts, HeadStreamRequest, HeadStreamResponse, PlanColdFlushRequest,
30 PlanGroupColdFlushRequest, PublishSnapshotRequest, PublishSnapshotResponse,
31 ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest, StreamAppendCount,
32 TouchStreamAccessResponse,
33};
34
35pub(crate) struct AppendPayloadInput<'a> {
36 stream_id: BucketStreamId,
37 content_type: Option<&'a str>,
38 payload: &'a [u8],
39 close_after: bool,
40 stream_seq: Option<String>,
41 producer: Option<ProducerRequest>,
42 now_ms: u64,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct InMemoryGroupEngine {
47 pub(crate) commit_index: u64,
48 pub(crate) state_machine: StreamStateMachine,
49 pub(crate) stream_append_counts: HashMap<BucketStreamId, u64>,
50 pub(crate) cold_store: Option<ColdStoreHandle>,
51}
52
53impl InMemoryGroupEngine {
54 pub fn with_cold_store(cold_store: ColdStoreHandle) -> Self {
55 Self {
56 cold_store: Some(cold_store),
57 ..Self::default()
58 }
59 }
60
61 pub fn cold_store(&self) -> Option<ColdStoreHandle> {
62 self.cold_store.clone()
63 }
64
65 pub fn apply_committed_write(
66 &mut self,
67 command: GroupWriteCommand,
68 placement: ShardPlacement,
69 ) -> Result<GroupWriteResponse, GroupEngineError> {
70 match command {
71 GroupWriteCommand::CreateStream {
72 stream_id,
73 content_type,
74 initial_payload,
75 close_after,
76 stream_seq,
77 producer,
78 stream_ttl_seconds,
79 stream_expires_at_ms,
80 forked_from,
81 fork_offset,
82 now_ms,
83 } => {
84 ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
85 let response = self.state_machine.apply(StreamCommand::CreateStream {
86 stream_id,
87 content_type,
88 initial_payload: initial_payload.to_vec(),
89 close_after,
90 stream_seq,
91 producer,
92 stream_ttl_seconds,
93 stream_expires_at_ms,
94 forked_from,
95 fork_offset,
96 now_ms,
97 });
98 match response {
99 StreamResponse::Created {
100 next_offset,
101 closed,
102 ..
103 } => {
104 self.commit_index += 1;
105 Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
106 placement,
107 next_offset,
108 closed,
109 already_exists: false,
110 group_commit_index: self.commit_index,
111 }))
112 }
113 StreamResponse::AlreadyExists {
114 next_offset,
115 closed,
116 ..
117 } => Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
118 placement,
119 next_offset,
120 closed,
121 already_exists: true,
122 group_commit_index: self.commit_index,
123 })),
124 StreamResponse::Error {
125 code,
126 message,
127 next_offset,
128 } => Err(GroupEngineError::stream_with_next_offset(
129 code,
130 message,
131 next_offset,
132 )),
133 other => Err(GroupEngineError::new(format!(
134 "unexpected create stream response: {other:?}"
135 ))),
136 }
137 }
138 GroupWriteCommand::CreateExternal {
139 stream_id,
140 content_type,
141 initial_payload,
142 close_after,
143 stream_seq,
144 producer,
145 stream_ttl_seconds,
146 stream_expires_at_ms,
147 forked_from,
148 fork_offset,
149 now_ms,
150 } => {
151 ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
152 let response = self.state_machine.apply(StreamCommand::CreateExternal {
153 stream_id,
154 content_type,
155 initial_payload,
156 close_after,
157 stream_seq,
158 producer,
159 stream_ttl_seconds,
160 stream_expires_at_ms,
161 forked_from,
162 fork_offset,
163 now_ms,
164 });
165 match response {
166 StreamResponse::Created {
167 next_offset,
168 closed,
169 ..
170 } => {
171 self.commit_index += 1;
172 Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
173 placement,
174 next_offset,
175 closed,
176 already_exists: false,
177 group_commit_index: self.commit_index,
178 }))
179 }
180 StreamResponse::AlreadyExists {
181 next_offset,
182 closed,
183 ..
184 } => Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
185 placement,
186 next_offset,
187 closed,
188 already_exists: true,
189 group_commit_index: self.commit_index,
190 })),
191 StreamResponse::Error {
192 code,
193 message,
194 next_offset,
195 } => Err(GroupEngineError::stream_with_next_offset(
196 code,
197 message,
198 next_offset,
199 )),
200 other => Err(GroupEngineError::new(format!(
201 "unexpected create external stream response: {other:?}"
202 ))),
203 }
204 }
205 GroupWriteCommand::Append {
206 stream_id,
207 content_type,
208 payload,
209 close_after,
210 stream_seq,
211 producer,
212 now_ms,
213 } => self
214 .append_payload(
215 AppendPayloadInput {
216 stream_id,
217 content_type: Some(&content_type),
218 payload: &payload,
219 close_after,
220 stream_seq,
221 producer,
222 now_ms,
223 },
224 placement,
225 )
226 .map(GroupWriteResponse::Append),
227 GroupWriteCommand::AppendExternal {
228 stream_id,
229 content_type,
230 payload,
231 close_after,
232 stream_seq,
233 producer,
234 now_ms,
235 } => {
236 let response = self.state_machine.apply(StreamCommand::AppendExternal {
237 stream_id: stream_id.clone(),
238 content_type: Some(content_type),
239 payload,
240 close_after,
241 stream_seq,
242 producer,
243 now_ms,
244 });
245 match response {
246 StreamResponse::Appended {
247 offset,
248 next_offset,
249 closed,
250 deduplicated,
251 producer,
252 ..
253 } => {
254 let stream_append_count =
255 self.stream_append_counts.entry(stream_id).or_insert(0);
256 if !deduplicated {
257 self.commit_index += 1;
258 *stream_append_count += 1;
259 }
260 Ok(GroupWriteResponse::Append(AppendResponse {
261 placement,
262 start_offset: offset,
263 next_offset,
264 stream_append_count: *stream_append_count,
265 group_commit_index: self.commit_index,
266 closed,
267 deduplicated,
268 producer,
269 }))
270 }
271 StreamResponse::Error {
272 code,
273 message,
274 next_offset,
275 } => Err(GroupEngineError::stream_with_next_offset(
276 code,
277 message,
278 next_offset,
279 )),
280 other => Err(GroupEngineError::new(format!(
281 "unexpected append external response: {other:?}"
282 ))),
283 }
284 }
285 GroupWriteCommand::AppendBatch {
286 stream_id,
287 content_type,
288 payloads,
289 producer,
290 now_ms,
291 } => {
292 if producer.is_some() {
293 let payload_refs = payloads.iter().map(Bytes::as_ref).collect::<Vec<_>>();
294 let batch = self
295 .state_machine
296 .append_batch_borrowed(
297 stream_id.clone(),
298 Some(&content_type),
299 &payload_refs,
300 producer,
301 now_ms,
302 )
303 .map_err(stream_response_error)?;
304 let old_commit_index = self.commit_index;
305 let old_append_count = *self.stream_append_counts.get(&stream_id).unwrap_or(&0);
306 if !batch.deduplicated {
307 let count = u64::try_from(batch.items.len()).expect("item count fits u64");
308 self.commit_index += count;
309 *self.stream_append_counts.entry(stream_id).or_insert(0) += count;
310 }
311 let items = batch
312 .items
313 .into_iter()
314 .enumerate()
315 .map(|(index, item)| {
316 let item_index = u64::try_from(index + 1).expect("item index fits u64");
317 Ok(AppendResponse {
318 placement,
319 start_offset: item.offset,
320 next_offset: item.next_offset,
321 stream_append_count: if item.deduplicated {
322 old_append_count
323 } else {
324 old_append_count + item_index
325 },
326 group_commit_index: if item.deduplicated {
327 old_commit_index
328 } else {
329 old_commit_index + item_index
330 },
331 closed: item.closed,
332 deduplicated: item.deduplicated,
333 producer: None,
334 })
335 })
336 .collect();
337 return Ok(GroupWriteResponse::AppendBatch(GroupAppendBatchResponse {
338 placement,
339 items,
340 }));
341 }
342
343 let mut items = Vec::with_capacity(payloads.len());
344 for payload in payloads {
345 if payload.is_empty() {
346 items.push(Err(GroupEngineError::stream(
347 StreamErrorCode::EmptyAppend,
348 "append payload must be non-empty",
349 )));
350 continue;
351 }
352 items.push(self.append_payload(
353 AppendPayloadInput {
354 stream_id: stream_id.clone(),
355 content_type: Some(&content_type),
356 payload: &payload,
357 close_after: false,
358 stream_seq: None,
359 producer: None,
360 now_ms,
361 },
362 placement,
363 ));
364 }
365 Ok(GroupWriteResponse::AppendBatch(GroupAppendBatchResponse {
366 placement,
367 items,
368 }))
369 }
370 GroupWriteCommand::PublishSnapshot {
371 stream_id,
372 snapshot_offset,
373 content_type,
374 payload,
375 now_ms,
376 } => {
377 let response = self.state_machine.apply(StreamCommand::PublishSnapshot {
378 stream_id,
379 snapshot_offset,
380 content_type,
381 payload: payload.to_vec(),
382 now_ms,
383 });
384 match response {
385 StreamResponse::SnapshotPublished { snapshot_offset } => {
386 self.commit_index += 1;
387 Ok(GroupWriteResponse::PublishSnapshot(
388 PublishSnapshotResponse {
389 placement,
390 snapshot_offset,
391 group_commit_index: self.commit_index,
392 },
393 ))
394 }
395 StreamResponse::Error {
396 code,
397 message,
398 next_offset,
399 } => Err(GroupEngineError::stream_with_next_offset(
400 code,
401 message,
402 next_offset,
403 )),
404 other => Err(GroupEngineError::new(format!(
405 "unexpected publish snapshot response: {other:?}"
406 ))),
407 }
408 }
409 GroupWriteCommand::TouchStreamAccess {
410 stream_id,
411 now_ms,
412 renew_ttl,
413 } => {
414 let response = self.state_machine.apply(StreamCommand::TouchStreamAccess {
415 stream_id,
416 now_ms,
417 renew_ttl,
418 });
419 match response {
420 StreamResponse::Accessed { changed, expired } => {
421 if changed || expired {
422 self.commit_index += 1;
423 }
424 Ok(GroupWriteResponse::TouchStreamAccess(
425 TouchStreamAccessResponse {
426 placement,
427 changed,
428 expired,
429 group_commit_index: self.commit_index,
430 },
431 ))
432 }
433 StreamResponse::Error {
434 code,
435 message,
436 next_offset,
437 } => Err(GroupEngineError::stream_with_next_offset(
438 code,
439 message,
440 next_offset,
441 )),
442 other => Err(GroupEngineError::new(format!(
443 "unexpected touch stream access response: {other:?}"
444 ))),
445 }
446 }
447 GroupWriteCommand::AddForkRef { stream_id, now_ms } => {
448 let response = self
449 .state_machine
450 .apply(StreamCommand::AddForkRef { stream_id, now_ms });
451 match response {
452 StreamResponse::ForkRefAdded { fork_ref_count } => {
453 self.commit_index += 1;
454 Ok(GroupWriteResponse::AddForkRef(ForkRefResponse {
455 placement,
456 fork_ref_count,
457 hard_deleted: false,
458 parent_to_release: None,
459 group_commit_index: self.commit_index,
460 }))
461 }
462 StreamResponse::Error {
463 code,
464 message,
465 next_offset,
466 } => Err(GroupEngineError::stream_with_next_offset(
467 code,
468 message,
469 next_offset,
470 )),
471 other => Err(GroupEngineError::new(format!(
472 "unexpected add fork ref response: {other:?}"
473 ))),
474 }
475 }
476 GroupWriteCommand::ReleaseForkRef { stream_id } => {
477 let response = self
478 .state_machine
479 .apply(StreamCommand::ReleaseForkRef { stream_id });
480 match response {
481 StreamResponse::ForkRefReleased {
482 hard_deleted,
483 fork_ref_count,
484 parent_to_release,
485 } => {
486 self.commit_index += 1;
487 Ok(GroupWriteResponse::ReleaseForkRef(ForkRefResponse {
488 placement,
489 fork_ref_count,
490 hard_deleted,
491 parent_to_release,
492 group_commit_index: self.commit_index,
493 }))
494 }
495 StreamResponse::Error {
496 code,
497 message,
498 next_offset,
499 } => Err(GroupEngineError::stream_with_next_offset(
500 code,
501 message,
502 next_offset,
503 )),
504 other => Err(GroupEngineError::new(format!(
505 "unexpected release fork ref response: {other:?}"
506 ))),
507 }
508 }
509 GroupWriteCommand::FlushCold { stream_id, chunk } => {
510 let response = self
511 .state_machine
512 .apply(StreamCommand::FlushCold { stream_id, chunk });
513 match response {
514 StreamResponse::ColdFlushed { hot_start_offset } => {
515 self.commit_index += 1;
516 Ok(GroupWriteResponse::FlushCold(FlushColdResponse {
517 placement,
518 hot_start_offset,
519 group_commit_index: self.commit_index,
520 }))
521 }
522 StreamResponse::Error {
523 code,
524 message,
525 next_offset,
526 } => Err(GroupEngineError::stream_with_next_offset(
527 code,
528 message,
529 next_offset,
530 )),
531 other => Err(GroupEngineError::new(format!(
532 "unexpected flush cold response: {other:?}"
533 ))),
534 }
535 }
536 GroupWriteCommand::CloseStream {
537 stream_id,
538 stream_seq,
539 producer,
540 now_ms,
541 } => {
542 let response = self.state_machine.apply(StreamCommand::Close {
543 stream_id,
544 stream_seq,
545 producer,
546 now_ms,
547 });
548 match response {
549 StreamResponse::Closed {
550 next_offset,
551 deduplicated,
552 ..
553 } => {
554 if !deduplicated {
555 self.commit_index += 1;
556 }
557 Ok(GroupWriteResponse::CloseStream(CloseStreamResponse {
558 placement,
559 next_offset,
560 group_commit_index: self.commit_index,
561 deduplicated,
562 }))
563 }
564 StreamResponse::Error {
565 code,
566 message,
567 next_offset,
568 } => Err(GroupEngineError::stream_with_next_offset(
569 code,
570 message,
571 next_offset,
572 )),
573 other => Err(GroupEngineError::new(format!(
574 "unexpected close stream response: {other:?}"
575 ))),
576 }
577 }
578 GroupWriteCommand::DeleteStream { stream_id } => {
579 let response = self
580 .state_machine
581 .apply(StreamCommand::DeleteStream { stream_id });
582 match response {
583 StreamResponse::Deleted {
584 hard_deleted,
585 parent_to_release,
586 } => {
587 self.commit_index += 1;
588 Ok(GroupWriteResponse::DeleteStream(DeleteStreamResponse {
589 placement,
590 group_commit_index: self.commit_index,
591 hard_deleted,
592 parent_to_release,
593 }))
594 }
595 StreamResponse::Error {
596 code,
597 message,
598 next_offset,
599 } => Err(GroupEngineError::stream_with_next_offset(
600 code,
601 message,
602 next_offset,
603 )),
604 other => Err(GroupEngineError::new(format!(
605 "unexpected delete stream response: {other:?}"
606 ))),
607 }
608 }
609 GroupWriteCommand::Batch { commands } => Ok(GroupWriteResponse::Batch(
610 self.apply_committed_write_batch(commands, placement),
611 )),
612 }
613 }
614
615 pub(crate) fn cold_hot_backlog_for(
616 &self,
617 stream_id: BucketStreamId,
618 ) -> Result<ColdHotBacklog, GroupEngineError> {
619 let stream_hot_bytes = self.state_machine.hot_payload_len(&stream_id).unwrap_or(0);
620 Ok(ColdHotBacklog {
621 stream_id,
622 stream_hot_bytes,
623 group_hot_bytes: self.state_machine.total_hot_payload_bytes(),
624 })
625 }
626
627 pub(crate) fn enforce_cold_write_admission(
628 &self,
629 stream_id: &BucketStreamId,
630 admission: ColdWriteAdmission,
631 before_group_hot_bytes: u64,
632 after_group_hot_bytes: u64,
633 mutating: bool,
634 ) -> Result<(), GroupEngineError> {
635 let Some(limit) = admission.max_hot_bytes_per_group else {
636 return Ok(());
637 };
638 if !mutating || after_group_hot_bytes <= limit {
639 return Ok(());
640 }
641 Err(GroupEngineError::new(format!(
642 "ColdBackpressure: stream '{stream_id}' would raise group hot bytes from {before_group_hot_bytes} to {after_group_hot_bytes}, above limit {limit}"
643 )))
644 }
645
646 pub(crate) fn create_stream_with_admission_inner(
647 &mut self,
648 request: CreateStreamRequest,
649 placement: ShardPlacement,
650 admission: ColdWriteAdmission,
651 ) -> Result<CreateStreamResponse, GroupEngineError> {
652 let stream_id = request.stream_id.clone();
653 let command = GroupWriteCommand::from(request);
654 let before = self.state_machine.total_hot_payload_bytes();
655 let mut preview = self.clone();
656 let response = match preview.apply_committed_write(command, placement)? {
657 GroupWriteResponse::CreateStream(response) => response,
658 other => {
659 return Err(GroupEngineError::new(format!(
660 "unexpected create stream write response: {other:?}"
661 )));
662 }
663 };
664 preview.enforce_cold_write_admission(
665 &stream_id,
666 admission,
667 before,
668 preview.state_machine.total_hot_payload_bytes(),
669 !response.already_exists,
670 )?;
671 *self = preview;
672 Ok(response)
673 }
674
675 pub(crate) fn append_with_admission_inner(
676 &mut self,
677 request: AppendRequest,
678 placement: ShardPlacement,
679 admission: ColdWriteAdmission,
680 ) -> Result<AppendResponse, GroupEngineError> {
681 let stream_id = request.stream_id.clone();
682 let command = GroupWriteCommand::from(request);
683 let before = self.state_machine.total_hot_payload_bytes();
684 let mut preview = self.clone();
685 let response = match preview.apply_committed_write(command, placement)? {
686 GroupWriteResponse::Append(response) => response,
687 other => {
688 return Err(GroupEngineError::new(format!(
689 "unexpected append write response: {other:?}"
690 )));
691 }
692 };
693 preview.enforce_cold_write_admission(
694 &stream_id,
695 admission,
696 before,
697 preview.state_machine.total_hot_payload_bytes(),
698 !response.deduplicated,
699 )?;
700 *self = preview;
701 Ok(response)
702 }
703
704 pub(crate) fn append_batch_with_admission_inner(
705 &mut self,
706 request: AppendBatchRequest,
707 placement: ShardPlacement,
708 admission: ColdWriteAdmission,
709 ) -> Result<GroupAppendBatchResponse, GroupEngineError> {
710 let stream_id = request.stream_id.clone();
711 let command = GroupWriteCommand::from(request);
712 let before = self.state_machine.total_hot_payload_bytes();
713 let mut preview = self.clone();
714 let response = match preview.apply_committed_write(command, placement)? {
715 GroupWriteResponse::AppendBatch(response) => response,
716 other => {
717 return Err(GroupEngineError::new(format!(
718 "unexpected append batch write response: {other:?}"
719 )));
720 }
721 };
722 let mutating = response
723 .items
724 .iter()
725 .any(|item| matches!(item, Ok(response) if !response.deduplicated));
726 preview.enforce_cold_write_admission(
727 &stream_id,
728 admission,
729 before,
730 preview.state_machine.total_hot_payload_bytes(),
731 mutating,
732 )?;
733 *self = preview;
734 Ok(response)
735 }
736
737 pub fn access_requires_write(
738 &self,
739 stream_id: &BucketStreamId,
740 now_ms: u64,
741 renew_ttl: bool,
742 ) -> Result<bool, GroupEngineError> {
743 self.state_machine
744 .access_requires_write(stream_id, now_ms, renew_ttl)
745 .map_err(stream_response_error)
746 }
747
748 pub(crate) fn apply_access_command(
749 &mut self,
750 stream_id: BucketStreamId,
751 now_ms: u64,
752 renew_ttl: bool,
753 placement: ShardPlacement,
754 ) -> Result<TouchStreamAccessResponse, GroupEngineError> {
755 match self.apply_committed_write(
756 GroupWriteCommand::TouchStreamAccess {
757 stream_id,
758 now_ms,
759 renew_ttl,
760 },
761 placement,
762 )? {
763 GroupWriteResponse::TouchStreamAccess(response) => Ok(response),
764 other => Err(GroupEngineError::new(format!(
765 "unexpected touch stream access write response: {other:?}"
766 ))),
767 }
768 }
769
770 pub(crate) fn ensure_stream_access(
771 &mut self,
772 stream_id: &BucketStreamId,
773 now_ms: u64,
774 renew_ttl: bool,
775 placement: ShardPlacement,
776 ) -> Result<Option<TouchStreamAccessResponse>, GroupEngineError> {
777 if !self.access_requires_write(stream_id, now_ms, renew_ttl)? {
778 return Ok(None);
779 }
780 let response =
781 self.apply_access_command(stream_id.clone(), now_ms, renew_ttl, placement)?;
782 if response.expired {
783 return Err(GroupEngineError::stream(
784 StreamErrorCode::StreamNotFound,
785 format!("stream '{stream_id}' does not exist"),
786 ));
787 }
788 Ok(Some(response))
789 }
790
791 pub fn apply_committed_write_batch(
792 &mut self,
793 commands: Vec<GroupWriteCommand>,
794 placement: ShardPlacement,
795 ) -> Vec<Result<GroupWriteResponse, GroupEngineError>> {
796 commands
797 .into_iter()
798 .map(|command| self.apply_committed_write(command, placement))
799 .collect()
800 }
801
802 pub(crate) fn apply_replayed_write_command(
803 &mut self,
804 command: GroupWriteCommand,
805 ) -> Result<(), GroupEngineError> {
806 let placement = ShardPlacement {
807 core_id: CoreId(0),
808 shard_id: ShardId(0),
809 raft_group_id: RaftGroupId(0),
810 };
811 self.apply_committed_write(command, placement).map(|_| ())
812 }
813
814 pub(crate) fn apply_replayed_command(
815 &mut self,
816 command: StreamCommand,
817 ) -> Result<(), GroupEngineError> {
818 match command {
819 StreamCommand::CreateBucket { bucket_id } => {
820 match self
821 .state_machine
822 .apply(StreamCommand::CreateBucket { bucket_id })
823 {
824 StreamResponse::BucketCreated { .. } => {
825 self.commit_index += 1;
826 Ok(())
827 }
828 StreamResponse::BucketAlreadyExists { .. } => Ok(()),
829 StreamResponse::Error {
830 code,
831 message,
832 next_offset,
833 } => Err(GroupEngineError::stream_with_next_offset(
834 code,
835 message,
836 next_offset,
837 )),
838 other => Err(GroupEngineError::new(format!(
839 "unexpected replay create bucket response: {other:?}"
840 ))),
841 }
842 }
843 StreamCommand::DeleteBucket { bucket_id } => {
844 match self
845 .state_machine
846 .apply(StreamCommand::DeleteBucket { bucket_id })
847 {
848 StreamResponse::BucketDeleted { .. } => {
849 self.commit_index += 1;
850 Ok(())
851 }
852 StreamResponse::Error {
853 code,
854 message,
855 next_offset,
856 } => Err(GroupEngineError::stream_with_next_offset(
857 code,
858 message,
859 next_offset,
860 )),
861 other => Err(GroupEngineError::new(format!(
862 "unexpected replay delete bucket response: {other:?}"
863 ))),
864 }
865 }
866 StreamCommand::CreateStream {
867 stream_id,
868 content_type,
869 initial_payload,
870 close_after,
871 stream_seq,
872 producer,
873 stream_ttl_seconds,
874 stream_expires_at_ms,
875 forked_from,
876 fork_offset,
877 now_ms,
878 } => {
879 ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
880 let response = self.state_machine.apply(StreamCommand::CreateStream {
881 stream_id,
882 content_type,
883 initial_payload,
884 close_after,
885 stream_seq,
886 producer,
887 stream_ttl_seconds,
888 stream_expires_at_ms,
889 forked_from,
890 fork_offset,
891 now_ms,
892 });
893 match response {
894 StreamResponse::Created { .. } => {
895 self.commit_index += 1;
896 Ok(())
897 }
898 StreamResponse::AlreadyExists { .. } => Ok(()),
899 StreamResponse::Error {
900 code,
901 message,
902 next_offset,
903 } => Err(GroupEngineError::stream_with_next_offset(
904 code,
905 message,
906 next_offset,
907 )),
908 other => Err(GroupEngineError::new(format!(
909 "unexpected replay create stream response: {other:?}"
910 ))),
911 }
912 }
913 StreamCommand::CreateExternal {
914 stream_id,
915 content_type,
916 initial_payload,
917 close_after,
918 stream_seq,
919 producer,
920 stream_ttl_seconds,
921 stream_expires_at_ms,
922 forked_from,
923 fork_offset,
924 now_ms,
925 } => {
926 ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
927 let response = self.state_machine.apply(StreamCommand::CreateExternal {
928 stream_id,
929 content_type,
930 initial_payload,
931 close_after,
932 stream_seq,
933 producer,
934 stream_ttl_seconds,
935 stream_expires_at_ms,
936 forked_from,
937 fork_offset,
938 now_ms,
939 });
940 match response {
941 StreamResponse::Created { .. } => {
942 self.commit_index += 1;
943 Ok(())
944 }
945 StreamResponse::AlreadyExists { .. } => Ok(()),
946 StreamResponse::Error {
947 code,
948 message,
949 next_offset,
950 } => Err(GroupEngineError::stream_with_next_offset(
951 code,
952 message,
953 next_offset,
954 )),
955 other => Err(GroupEngineError::new(format!(
956 "unexpected replay external create stream response: {other:?}"
957 ))),
958 }
959 }
960 StreamCommand::Append {
961 stream_id,
962 content_type,
963 payload,
964 close_after,
965 stream_seq,
966 producer,
967 now_ms,
968 } => {
969 let stream_count_key = stream_id.clone();
970 let response = self.state_machine.apply(StreamCommand::Append {
971 stream_id,
972 content_type,
973 payload,
974 close_after,
975 stream_seq,
976 producer,
977 now_ms,
978 });
979 match response {
980 StreamResponse::Appended { deduplicated, .. } => {
981 if !deduplicated {
982 self.commit_index += 1;
983 *self
984 .stream_append_counts
985 .entry(stream_count_key)
986 .or_insert(0) += 1;
987 }
988 Ok(())
989 }
990 StreamResponse::Closed { deduplicated, .. } => {
991 if !deduplicated {
992 self.commit_index += 1;
993 }
994 Ok(())
995 }
996 StreamResponse::Error {
997 code,
998 message,
999 next_offset,
1000 } => Err(GroupEngineError::stream_with_next_offset(
1001 code,
1002 message,
1003 next_offset,
1004 )),
1005 other => Err(GroupEngineError::new(format!(
1006 "unexpected replay append response: {other:?}"
1007 ))),
1008 }
1009 }
1010 StreamCommand::AppendExternal {
1011 stream_id,
1012 content_type,
1013 payload,
1014 close_after,
1015 stream_seq,
1016 producer,
1017 now_ms,
1018 } => {
1019 let stream_count_key = stream_id.clone();
1020 let response = self.state_machine.apply(StreamCommand::AppendExternal {
1021 stream_id,
1022 content_type,
1023 payload,
1024 close_after,
1025 stream_seq,
1026 producer,
1027 now_ms,
1028 });
1029 match response {
1030 StreamResponse::Appended { deduplicated, .. } => {
1031 if !deduplicated {
1032 self.commit_index += 1;
1033 *self
1034 .stream_append_counts
1035 .entry(stream_count_key)
1036 .or_insert(0) += 1;
1037 }
1038 Ok(())
1039 }
1040 StreamResponse::Error {
1041 code,
1042 message,
1043 next_offset,
1044 } => Err(GroupEngineError::stream_with_next_offset(
1045 code,
1046 message,
1047 next_offset,
1048 )),
1049 other => Err(GroupEngineError::new(format!(
1050 "unexpected replay external append response: {other:?}"
1051 ))),
1052 }
1053 }
1054 StreamCommand::AppendBatch {
1055 stream_id,
1056 content_type,
1057 payloads,
1058 producer,
1059 now_ms,
1060 } => {
1061 let stream_count_key = stream_id.clone();
1062 let payload_refs = payloads.iter().map(Vec::as_slice).collect::<Vec<_>>();
1063 let response = self
1064 .state_machine
1065 .append_batch_borrowed(
1066 stream_id,
1067 content_type.as_deref(),
1068 &payload_refs,
1069 producer,
1070 now_ms,
1071 )
1072 .map_err(stream_response_error)?;
1073 if !response.deduplicated {
1074 let count = u64::try_from(response.items.len()).expect("item count fits u64");
1075 self.commit_index += count;
1076 *self
1077 .stream_append_counts
1078 .entry(stream_count_key)
1079 .or_insert(0) += count;
1080 }
1081 Ok(())
1082 }
1083 StreamCommand::PublishSnapshot {
1084 stream_id,
1085 snapshot_offset,
1086 content_type,
1087 payload,
1088 now_ms,
1089 } => {
1090 let response = self.state_machine.apply(StreamCommand::PublishSnapshot {
1091 stream_id,
1092 snapshot_offset,
1093 content_type,
1094 payload,
1095 now_ms,
1096 });
1097 match response {
1098 StreamResponse::SnapshotPublished { .. } => {
1099 self.commit_index += 1;
1100 Ok(())
1101 }
1102 StreamResponse::Error {
1103 code,
1104 message,
1105 next_offset,
1106 } => Err(GroupEngineError::stream_with_next_offset(
1107 code,
1108 message,
1109 next_offset,
1110 )),
1111 other => Err(GroupEngineError::new(format!(
1112 "unexpected replay publish snapshot response: {other:?}"
1113 ))),
1114 }
1115 }
1116 StreamCommand::TouchStreamAccess {
1117 stream_id,
1118 now_ms,
1119 renew_ttl,
1120 } => {
1121 let response = self.state_machine.apply(StreamCommand::TouchStreamAccess {
1122 stream_id,
1123 now_ms,
1124 renew_ttl,
1125 });
1126 match response {
1127 StreamResponse::Accessed { changed, expired } => {
1128 if changed || expired {
1129 self.commit_index += 1;
1130 }
1131 Ok(())
1132 }
1133 StreamResponse::Error {
1134 code,
1135 message,
1136 next_offset,
1137 } => Err(GroupEngineError::stream_with_next_offset(
1138 code,
1139 message,
1140 next_offset,
1141 )),
1142 other => Err(GroupEngineError::new(format!(
1143 "unexpected replay touch stream access response: {other:?}"
1144 ))),
1145 }
1146 }
1147 StreamCommand::AddForkRef { stream_id, now_ms } => {
1148 let response = self
1149 .state_machine
1150 .apply(StreamCommand::AddForkRef { stream_id, now_ms });
1151 match response {
1152 StreamResponse::ForkRefAdded { .. } => {
1153 self.commit_index += 1;
1154 Ok(())
1155 }
1156 StreamResponse::Error {
1157 code,
1158 message,
1159 next_offset,
1160 } => Err(GroupEngineError::stream_with_next_offset(
1161 code,
1162 message,
1163 next_offset,
1164 )),
1165 other => Err(GroupEngineError::new(format!(
1166 "unexpected replay add fork ref response: {other:?}"
1167 ))),
1168 }
1169 }
1170 StreamCommand::ReleaseForkRef { stream_id } => {
1171 let response = self
1172 .state_machine
1173 .apply(StreamCommand::ReleaseForkRef { stream_id });
1174 match response {
1175 StreamResponse::ForkRefReleased { .. } => {
1176 self.commit_index += 1;
1177 Ok(())
1178 }
1179 StreamResponse::Error {
1180 code,
1181 message,
1182 next_offset,
1183 } => Err(GroupEngineError::stream_with_next_offset(
1184 code,
1185 message,
1186 next_offset,
1187 )),
1188 other => Err(GroupEngineError::new(format!(
1189 "unexpected replay release fork ref response: {other:?}"
1190 ))),
1191 }
1192 }
1193 StreamCommand::FlushCold { stream_id, chunk } => {
1194 let response = self
1195 .state_machine
1196 .apply(StreamCommand::FlushCold { stream_id, chunk });
1197 match response {
1198 StreamResponse::ColdFlushed { .. } => {
1199 self.commit_index += 1;
1200 Ok(())
1201 }
1202 StreamResponse::Error {
1203 code,
1204 message,
1205 next_offset,
1206 } => Err(GroupEngineError::stream_with_next_offset(
1207 code,
1208 message,
1209 next_offset,
1210 )),
1211 other => Err(GroupEngineError::new(format!(
1212 "unexpected replay flush cold response: {other:?}"
1213 ))),
1214 }
1215 }
1216 StreamCommand::Close {
1217 stream_id,
1218 stream_seq,
1219 producer,
1220 now_ms,
1221 } => {
1222 let response = self.state_machine.apply(StreamCommand::Close {
1223 stream_id,
1224 stream_seq,
1225 producer,
1226 now_ms,
1227 });
1228 match response {
1229 StreamResponse::Closed { deduplicated, .. } => {
1230 if !deduplicated {
1231 self.commit_index += 1;
1232 }
1233 Ok(())
1234 }
1235 StreamResponse::Error {
1236 code,
1237 message,
1238 next_offset,
1239 } => Err(GroupEngineError::stream_with_next_offset(
1240 code,
1241 message,
1242 next_offset,
1243 )),
1244 other => Err(GroupEngineError::new(format!(
1245 "unexpected replay close stream response: {other:?}"
1246 ))),
1247 }
1248 }
1249 StreamCommand::DeleteStream { stream_id } => {
1250 let response = self
1251 .state_machine
1252 .apply(StreamCommand::DeleteStream { stream_id });
1253 match response {
1254 StreamResponse::Deleted { .. } => {
1255 self.commit_index += 1;
1256 Ok(())
1257 }
1258 StreamResponse::Error {
1259 code,
1260 message,
1261 next_offset,
1262 } => Err(GroupEngineError::stream_with_next_offset(
1263 code,
1264 message,
1265 next_offset,
1266 )),
1267 other => Err(GroupEngineError::new(format!(
1268 "unexpected replay delete stream response: {other:?}"
1269 ))),
1270 }
1271 }
1272 }
1273 }
1274
1275 pub(crate) fn append_payload(
1276 &mut self,
1277 input: AppendPayloadInput<'_>,
1278 placement: ShardPlacement,
1279 ) -> Result<AppendResponse, GroupEngineError> {
1280 let AppendPayloadInput {
1281 stream_id,
1282 content_type,
1283 payload,
1284 close_after,
1285 stream_seq,
1286 producer,
1287 now_ms,
1288 } = input;
1289 let stream_count_key = stream_id.clone();
1290 let response = self.state_machine.append_borrowed(AppendStreamInput {
1291 stream_id,
1292 content_type,
1293 payload,
1294 close_after,
1295 stream_seq,
1296 producer,
1297 now_ms,
1298 });
1299 match response {
1300 StreamResponse::Appended {
1301 offset,
1302 next_offset,
1303 closed,
1304 deduplicated,
1305 producer,
1306 ..
1307 } => {
1308 let stream_append_count = self
1309 .stream_append_counts
1310 .entry(stream_count_key)
1311 .or_insert(0);
1312 if !deduplicated {
1313 self.commit_index += 1;
1314 *stream_append_count += 1;
1315 }
1316 Ok(AppendResponse {
1317 placement,
1318 start_offset: offset,
1319 next_offset,
1320 stream_append_count: *stream_append_count,
1321 group_commit_index: self.commit_index,
1322 closed,
1323 deduplicated,
1324 producer,
1325 })
1326 }
1327 StreamResponse::Error {
1328 code,
1329 message,
1330 next_offset,
1331 } => Err(GroupEngineError::stream_with_next_offset(
1332 code,
1333 message,
1334 next_offset,
1335 )),
1336 other => Err(GroupEngineError::new(format!(
1337 "unexpected append response: {other:?}"
1338 ))),
1339 }
1340 }
1341
1342 pub fn read_stream_plan(
1343 &mut self,
1344 request: &ReadStreamRequest,
1345 placement: ShardPlacement,
1346 ) -> Result<StreamReadPlan, GroupEngineError> {
1347 self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1348 self.read_stream_plan_after_access(request)
1349 }
1350
1351 pub fn read_stream_plan_after_access(
1352 &self,
1353 request: &ReadStreamRequest,
1354 ) -> Result<StreamReadPlan, GroupEngineError> {
1355 self.state_machine
1356 .read_plan_at(
1357 &request.stream_id,
1358 request.offset,
1359 request.max_len,
1360 request.now_ms,
1361 )
1362 .map_err(stream_response_error)
1363 }
1364
1365 pub fn head_stream_after_access(
1366 &mut self,
1367 request: &HeadStreamRequest,
1368 placement: ShardPlacement,
1369 ) -> Result<HeadStreamResponse, GroupEngineError> {
1370 let Some(metadata) = self
1371 .state_machine
1372 .head_at(&request.stream_id, request.now_ms)
1373 else {
1374 return Err(GroupEngineError::stream(
1375 StreamErrorCode::StreamNotFound,
1376 format!("stream '{}' does not exist", request.stream_id),
1377 ));
1378 };
1379 Ok(HeadStreamResponse {
1380 placement,
1381 content_type: metadata.content_type.clone(),
1382 tail_offset: metadata.tail_offset,
1383 closed: metadata.status == ursula_stream::StreamStatus::Closed,
1384 stream_ttl_seconds: metadata.stream_ttl_seconds,
1385 stream_expires_at_ms: metadata.stream_expires_at_ms,
1386 snapshot_offset: self
1387 .state_machine
1388 .latest_snapshot(&request.stream_id)
1389 .map_err(stream_response_error)?
1390 .map(|snapshot| snapshot.offset),
1391 })
1392 }
1393
1394 pub async fn read_payload_from_plan(
1395 cold_store: Option<&ColdStoreHandle>,
1396 stream_id: &BucketStreamId,
1397 plan: &StreamReadPlan,
1398 ) -> Result<Vec<u8>, GroupEngineError> {
1399 let mut payload = Vec::new();
1400 for segment in &plan.segments {
1401 match segment {
1402 StreamReadSegment::Hot(bytes) => payload.extend_from_slice(bytes),
1403 StreamReadSegment::Object(segment) => {
1404 let Some(cold_store) = cold_store else {
1405 return Err(GroupEngineError::stream_with_next_offset(
1406 StreamErrorCode::InvalidColdFlush,
1407 format!("stream '{stream_id}' read requires object payload store"),
1408 Some(plan.next_offset),
1409 ));
1410 };
1411 let bytes = cold_store
1412 .read_object_range(&segment.object, segment.read_start_offset, segment.len)
1413 .await
1414 .map_err(|err| GroupEngineError::new(err.to_string()))?;
1415 payload.extend_from_slice(&bytes);
1416 }
1417 }
1418 }
1419 Ok(payload)
1420 }
1421
1422 pub(crate) async fn read_own_payload_from_plan(
1423 &self,
1424 stream_id: &BucketStreamId,
1425 plan: &StreamReadPlan,
1426 ) -> Result<Vec<u8>, GroupEngineError> {
1427 Self::read_payload_from_plan(self.cold_store.as_ref(), stream_id, plan).await
1428 }
1429
1430 pub(crate) async fn bootstrap_updates(
1431 &self,
1432 stream_id: &BucketStreamId,
1433 records: &[StreamMessageRecord],
1434 content_type: &str,
1435 now_ms: u64,
1436 ) -> Result<Vec<BootstrapUpdate>, GroupEngineError> {
1437 let mut updates = Vec::with_capacity(records.len());
1438 for record in records {
1439 let len = usize::try_from(record.end_offset - record.start_offset).map_err(|_| {
1440 GroupEngineError::stream(
1441 StreamErrorCode::InvalidSnapshot,
1442 format!(
1443 "bootstrap message [{}..{}) for stream '{stream_id}' is too large",
1444 record.start_offset, record.end_offset
1445 ),
1446 )
1447 })?;
1448 let plan = self
1449 .state_machine
1450 .read_plan_at(stream_id, record.start_offset, len, now_ms)
1451 .map_err(stream_response_error)?;
1452 let payload = self.read_own_payload_from_plan(stream_id, &plan).await?;
1453 updates.push(BootstrapUpdate {
1454 start_offset: record.start_offset,
1455 next_offset: record.end_offset,
1456 content_type: content_type.to_owned(),
1457 payload,
1458 });
1459 }
1460 Ok(updates)
1461 }
1462
1463 pub(crate) fn build_snapshot(&self, placement: ShardPlacement) -> GroupSnapshot {
1464 GroupSnapshot {
1465 placement,
1466 group_commit_index: self.commit_index,
1467 stream_snapshot: self.state_machine.snapshot(),
1468 stream_append_counts: self.stream_append_counts_snapshot(),
1469 }
1470 }
1471
1472 pub(crate) fn stream_append_counts_snapshot(&self) -> Vec<StreamAppendCount> {
1473 let mut counts = self
1474 .stream_append_counts
1475 .iter()
1476 .map(|(stream_id, append_count)| StreamAppendCount {
1477 stream_id: stream_id.clone(),
1478 append_count: *append_count,
1479 })
1480 .collect::<Vec<_>>();
1481 counts.sort_by(|left, right| compare_stream_ids(&left.stream_id, &right.stream_id));
1482 counts
1483 }
1484
1485 pub(crate) fn install_snapshot_inner(
1486 &mut self,
1487 snapshot: GroupSnapshot,
1488 ) -> Result<(), GroupEngineError> {
1489 let GroupSnapshot {
1490 placement: _,
1491 group_commit_index,
1492 stream_snapshot,
1493 stream_append_counts,
1494 } = snapshot;
1495 self.install_snapshot_parts(group_commit_index, stream_snapshot, stream_append_counts)
1496 }
1497
1498 pub(crate) fn install_snapshot_parts(
1499 &mut self,
1500 group_commit_index: u64,
1501 stream_snapshot: StreamSnapshot,
1502 stream_append_counts: Vec<StreamAppendCount>,
1503 ) -> Result<(), GroupEngineError> {
1504 let stream_ids = stream_snapshot
1505 .streams
1506 .iter()
1507 .map(|entry| entry.metadata.stream_id.clone())
1508 .collect::<HashSet<_>>();
1509 let state_machine = StreamStateMachine::restore(stream_snapshot)
1510 .map_err(|err| GroupEngineError::new(format!("restore stream snapshot: {err}")))?;
1511 let stream_append_counts = restore_stream_append_counts(stream_append_counts, &stream_ids)?;
1512
1513 self.commit_index = group_commit_index;
1514 self.state_machine = state_machine;
1515 self.stream_append_counts = stream_append_counts;
1516 Ok(())
1517 }
1518}
1519
1520impl GroupEngine for InMemoryGroupEngine {
1521 fn create_stream<'a>(
1522 &'a mut self,
1523 request: CreateStreamRequest,
1524 placement: ShardPlacement,
1525 ) -> GroupCreateStreamFuture<'a> {
1526 let command = GroupWriteCommand::from(request);
1527 Box::pin(async move {
1528 match self.apply_committed_write(command, placement)? {
1529 GroupWriteResponse::CreateStream(response) => Ok(response),
1530 other => Err(GroupEngineError::new(format!(
1531 "unexpected create stream write response: {other:?}"
1532 ))),
1533 }
1534 })
1535 }
1536
1537 fn create_stream_with_cold_admission<'a>(
1538 &'a mut self,
1539 request: CreateStreamRequest,
1540 placement: ShardPlacement,
1541 admission: ColdWriteAdmission,
1542 ) -> GroupCreateStreamFuture<'a> {
1543 if !admission.is_enabled() {
1544 return self.create_stream(request, placement);
1545 }
1546 Box::pin(
1547 async move { self.create_stream_with_admission_inner(request, placement, admission) },
1548 )
1549 }
1550
1551 fn create_stream_external<'a>(
1552 &'a mut self,
1553 request: CreateStreamExternalRequest,
1554 placement: ShardPlacement,
1555 ) -> GroupCreateStreamFuture<'a> {
1556 let command = GroupWriteCommand::from(request);
1557 Box::pin(async move {
1558 match self.apply_committed_write(command, placement)? {
1559 GroupWriteResponse::CreateStream(response) => Ok(response),
1560 other => Err(GroupEngineError::new(format!(
1561 "unexpected external create stream write response: {other:?}"
1562 ))),
1563 }
1564 })
1565 }
1566
1567 fn read_stream<'a>(
1568 &'a mut self,
1569 request: ReadStreamRequest,
1570 placement: ShardPlacement,
1571 ) -> GroupReadStreamFuture<'a> {
1572 Box::pin(async move {
1573 self.read_stream_parts(request, placement)
1574 .await?
1575 .into_response()
1576 .await
1577 })
1578 }
1579
1580 fn read_stream_parts<'a>(
1581 &'a mut self,
1582 request: ReadStreamRequest,
1583 placement: ShardPlacement,
1584 ) -> GroupReadStreamPartsFuture<'a> {
1585 Box::pin(async move {
1586 let stream_id = request.stream_id.clone();
1587 let plan = self.read_stream_plan(&request, placement)?;
1588 Ok(GroupReadStreamParts::from_plan(
1589 placement,
1590 stream_id,
1591 plan,
1592 self.cold_store(),
1593 ))
1594 })
1595 }
1596
1597 fn publish_snapshot<'a>(
1598 &'a mut self,
1599 request: PublishSnapshotRequest,
1600 placement: ShardPlacement,
1601 ) -> GroupPublishSnapshotFuture<'a> {
1602 Box::pin(async move {
1603 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1604 let command = GroupWriteCommand::from(request);
1605 match self.apply_committed_write(command, placement)? {
1606 GroupWriteResponse::PublishSnapshot(response) => Ok(response),
1607 other => Err(GroupEngineError::new(format!(
1608 "unexpected publish snapshot write response: {other:?}"
1609 ))),
1610 }
1611 })
1612 }
1613
1614 fn read_snapshot<'a>(
1615 &'a mut self,
1616 request: ReadSnapshotRequest,
1617 placement: ShardPlacement,
1618 ) -> GroupReadSnapshotFuture<'a> {
1619 Box::pin(async move {
1620 self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1621 let snapshot = match request.snapshot_offset {
1622 Some(offset) => self
1623 .state_machine
1624 .read_snapshot(&request.stream_id, offset)
1625 .map_err(stream_response_error)?,
1626 None => self
1627 .state_machine
1628 .latest_snapshot(&request.stream_id)
1629 .map_err(stream_response_error)?
1630 .ok_or_else(|| {
1631 GroupEngineError::stream(
1632 StreamErrorCode::SnapshotNotFound,
1633 format!("stream '{}' has no visible snapshot", request.stream_id),
1634 )
1635 })?,
1636 };
1637 let tail_offset = self
1638 .state_machine
1639 .head_at(&request.stream_id, request.now_ms)
1640 .map(|metadata| metadata.tail_offset)
1641 .unwrap_or(snapshot.offset);
1642 Ok(ReadSnapshotResponse {
1643 placement,
1644 snapshot_offset: snapshot.offset,
1645 next_offset: snapshot.offset,
1646 content_type: snapshot.content_type,
1647 payload: snapshot.payload,
1648 up_to_date: snapshot.offset == tail_offset,
1649 })
1650 })
1651 }
1652
1653 fn delete_snapshot<'a>(
1654 &'a mut self,
1655 request: DeleteSnapshotRequest,
1656 placement: ShardPlacement,
1657 ) -> GroupDeleteSnapshotFuture<'a> {
1658 Box::pin(async move {
1659 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1660 match self
1661 .state_machine
1662 .delete_snapshot(&request.stream_id, request.snapshot_offset)
1663 {
1664 StreamResponse::Error {
1665 code,
1666 message,
1667 next_offset,
1668 } => Err(GroupEngineError::stream_with_next_offset(
1669 code,
1670 message,
1671 next_offset,
1672 )),
1673 other => Err(GroupEngineError::new(format!(
1674 "unexpected delete snapshot response: {other:?}"
1675 ))),
1676 }
1677 })
1678 }
1679
1680 fn bootstrap_stream<'a>(
1681 &'a mut self,
1682 request: BootstrapStreamRequest,
1683 placement: ShardPlacement,
1684 ) -> GroupBootstrapStreamFuture<'a> {
1685 Box::pin(async move {
1686 self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1687 let plan = self
1688 .state_machine
1689 .bootstrap_plan(&request.stream_id)
1690 .map_err(stream_response_error)?;
1691 let snapshot_offset = plan.snapshot.as_ref().map(|snapshot| snapshot.offset);
1692 let snapshot_content_type = plan
1693 .snapshot
1694 .as_ref()
1695 .map(|snapshot| snapshot.content_type.clone())
1696 .unwrap_or_else(|| DEFAULT_CONTENT_TYPE.to_owned());
1697 let snapshot_payload = plan
1698 .snapshot
1699 .as_ref()
1700 .map(|snapshot| snapshot.payload.clone())
1701 .unwrap_or_default();
1702 let updates = self
1703 .bootstrap_updates(
1704 &request.stream_id,
1705 &plan.updates,
1706 &plan.content_type,
1707 request.now_ms,
1708 )
1709 .await?;
1710 Ok(BootstrapStreamResponse {
1711 placement,
1712 snapshot_offset,
1713 snapshot_content_type,
1714 snapshot_payload,
1715 updates,
1716 next_offset: plan.next_offset,
1717 up_to_date: plan.up_to_date,
1718 closed: plan.closed,
1719 })
1720 })
1721 }
1722
1723 fn touch_stream_access<'a>(
1724 &'a mut self,
1725 stream_id: BucketStreamId,
1726 now_ms: u64,
1727 renew_ttl: bool,
1728 placement: ShardPlacement,
1729 ) -> GroupTouchStreamAccessFuture<'a> {
1730 Box::pin(async move { self.apply_access_command(stream_id, now_ms, renew_ttl, placement) })
1731 }
1732
1733 fn add_fork_ref<'a>(
1734 &'a mut self,
1735 stream_id: BucketStreamId,
1736 now_ms: u64,
1737 placement: ShardPlacement,
1738 ) -> GroupForkRefFuture<'a> {
1739 Box::pin(async move {
1740 match self.apply_committed_write(
1741 GroupWriteCommand::AddForkRef { stream_id, now_ms },
1742 placement,
1743 )? {
1744 GroupWriteResponse::AddForkRef(response) => Ok(response),
1745 other => Err(GroupEngineError::new(format!(
1746 "unexpected add fork ref write response: {other:?}"
1747 ))),
1748 }
1749 })
1750 }
1751
1752 fn release_fork_ref<'a>(
1753 &'a mut self,
1754 stream_id: BucketStreamId,
1755 placement: ShardPlacement,
1756 ) -> GroupForkRefFuture<'a> {
1757 Box::pin(async move {
1758 match self
1759 .apply_committed_write(GroupWriteCommand::ReleaseForkRef { stream_id }, placement)?
1760 {
1761 GroupWriteResponse::ReleaseForkRef(response) => Ok(response),
1762 other => Err(GroupEngineError::new(format!(
1763 "unexpected release fork ref write response: {other:?}"
1764 ))),
1765 }
1766 })
1767 }
1768
1769 fn head_stream<'a>(
1770 &'a mut self,
1771 request: HeadStreamRequest,
1772 placement: ShardPlacement,
1773 ) -> GroupHeadStreamFuture<'a> {
1774 Box::pin(async move {
1775 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1776 self.head_stream_after_access(&request, placement)
1777 })
1778 }
1779
1780 fn close_stream<'a>(
1781 &'a mut self,
1782 request: CloseStreamRequest,
1783 placement: ShardPlacement,
1784 ) -> GroupCloseStreamFuture<'a> {
1785 Box::pin(async move {
1786 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1787 let command = GroupWriteCommand::from(request);
1788 match self.apply_committed_write(command, placement)? {
1789 GroupWriteResponse::CloseStream(response) => Ok(response),
1790 other => Err(GroupEngineError::new(format!(
1791 "unexpected close stream write response: {other:?}"
1792 ))),
1793 }
1794 })
1795 }
1796
1797 fn delete_stream<'a>(
1798 &'a mut self,
1799 request: DeleteStreamRequest,
1800 placement: ShardPlacement,
1801 ) -> GroupDeleteStreamFuture<'a> {
1802 let command = GroupWriteCommand::from(request);
1803 Box::pin(async move {
1804 match self.apply_committed_write(command, placement)? {
1805 GroupWriteResponse::DeleteStream(response) => Ok(response),
1806 other => Err(GroupEngineError::new(format!(
1807 "unexpected delete stream write response: {other:?}"
1808 ))),
1809 }
1810 })
1811 }
1812
1813 fn append<'a>(
1814 &'a mut self,
1815 request: AppendRequest,
1816 placement: ShardPlacement,
1817 ) -> GroupAppendFuture<'a> {
1818 Box::pin(async move {
1819 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1820 let command = GroupWriteCommand::from(request);
1821 match self.apply_committed_write(command, placement)? {
1822 GroupWriteResponse::Append(response) => Ok(response),
1823 other => Err(GroupEngineError::new(format!(
1824 "unexpected append write response: {other:?}"
1825 ))),
1826 }
1827 })
1828 }
1829
1830 fn append_with_cold_admission<'a>(
1831 &'a mut self,
1832 request: AppendRequest,
1833 placement: ShardPlacement,
1834 admission: ColdWriteAdmission,
1835 ) -> GroupAppendFuture<'a> {
1836 if !admission.is_enabled() {
1837 return self.append(request, placement);
1838 }
1839 Box::pin(async move { self.append_with_admission_inner(request, placement, admission) })
1840 }
1841
1842 fn append_external<'a>(
1843 &'a mut self,
1844 request: AppendExternalRequest,
1845 placement: ShardPlacement,
1846 ) -> GroupAppendFuture<'a> {
1847 Box::pin(async move {
1848 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1849 let command = GroupWriteCommand::from(request);
1850 match self.apply_committed_write(command, placement)? {
1851 GroupWriteResponse::Append(response) => Ok(response),
1852 other => Err(GroupEngineError::new(format!(
1853 "unexpected external append write response: {other:?}"
1854 ))),
1855 }
1856 })
1857 }
1858
1859 fn append_batch<'a>(
1860 &'a mut self,
1861 request: AppendBatchRequest,
1862 placement: ShardPlacement,
1863 ) -> GroupAppendBatchFuture<'a> {
1864 Box::pin(async move {
1865 self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1866 let command = GroupWriteCommand::from(request);
1867 match self.apply_committed_write(command, placement)? {
1868 GroupWriteResponse::AppendBatch(response) => Ok(response),
1869 other => Err(GroupEngineError::new(format!(
1870 "unexpected append batch write response: {other:?}"
1871 ))),
1872 }
1873 })
1874 }
1875
1876 fn append_batch_with_cold_admission<'a>(
1877 &'a mut self,
1878 request: AppendBatchRequest,
1879 placement: ShardPlacement,
1880 admission: ColdWriteAdmission,
1881 ) -> GroupAppendBatchFuture<'a> {
1882 if !admission.is_enabled() {
1883 return self.append_batch(request, placement);
1884 }
1885 Box::pin(
1886 async move { self.append_batch_with_admission_inner(request, placement, admission) },
1887 )
1888 }
1889
1890 fn flush_cold<'a>(
1891 &'a mut self,
1892 request: FlushColdRequest,
1893 placement: ShardPlacement,
1894 ) -> GroupFlushColdFuture<'a> {
1895 let command = GroupWriteCommand::from(request);
1896 Box::pin(async move {
1897 match self.apply_committed_write(command, placement)? {
1898 GroupWriteResponse::FlushCold(response) => Ok(response),
1899 other => Err(GroupEngineError::new(format!(
1900 "unexpected flush cold write response: {other:?}"
1901 ))),
1902 }
1903 })
1904 }
1905
1906 fn plan_cold_flush<'a>(
1907 &'a mut self,
1908 request: PlanColdFlushRequest,
1909 _placement: ShardPlacement,
1910 ) -> GroupPlanColdFlushFuture<'a> {
1911 Box::pin(async move {
1912 self.state_machine
1913 .plan_cold_flush(
1914 &request.stream_id,
1915 request.min_hot_bytes,
1916 request.max_flush_bytes,
1917 )
1918 .map_err(stream_response_error)
1919 })
1920 }
1921
1922 fn plan_next_cold_flush<'a>(
1923 &'a mut self,
1924 request: PlanGroupColdFlushRequest,
1925 _placement: ShardPlacement,
1926 ) -> GroupPlanNextColdFlushFuture<'a> {
1927 Box::pin(async move {
1928 self.state_machine
1929 .plan_next_cold_flush(request.min_hot_bytes, request.max_flush_bytes)
1930 .map_err(stream_response_error)
1931 })
1932 }
1933
1934 fn plan_next_cold_flush_batch<'a>(
1935 &'a mut self,
1936 request: PlanGroupColdFlushRequest,
1937 _placement: ShardPlacement,
1938 max_candidates: usize,
1939 ) -> GroupPlanNextColdFlushBatchFuture<'a> {
1940 Box::pin(async move {
1941 self.state_machine
1942 .plan_next_cold_flush_batch(
1943 request.min_hot_bytes,
1944 request.max_flush_bytes,
1945 max_candidates,
1946 )
1947 .map_err(stream_response_error)
1948 })
1949 }
1950
1951 fn cold_hot_backlog<'a>(
1952 &'a mut self,
1953 stream_id: BucketStreamId,
1954 _placement: ShardPlacement,
1955 ) -> GroupColdHotBacklogFuture<'a> {
1956 Box::pin(async move { self.cold_hot_backlog_for(stream_id) })
1957 }
1958
1959 fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a> {
1960 Box::pin(async move { Ok(self.build_snapshot(placement)) })
1961 }
1962
1963 fn install_snapshot<'a>(
1964 &'a mut self,
1965 snapshot: GroupSnapshot,
1966 ) -> GroupInstallSnapshotFuture<'a> {
1967 Box::pin(async move { self.install_snapshot_inner(snapshot) })
1968 }
1969}
1970
1971#[derive(Debug, Clone, Default)]
1972pub struct InMemoryGroupEngineFactory {
1973 cold_store: Option<ColdStoreHandle>,
1974}
1975
1976impl InMemoryGroupEngineFactory {
1977 pub fn new() -> Self {
1978 Self::default()
1979 }
1980
1981 pub fn with_cold_store(cold_store: Option<ColdStoreHandle>) -> Self {
1982 Self { cold_store }
1983 }
1984}
1985
1986impl GroupEngineFactory for InMemoryGroupEngineFactory {
1987 fn create<'a>(
1988 &'a self,
1989 _placement: ShardPlacement,
1990 _metrics: GroupEngineMetrics,
1991 ) -> GroupEngineCreateFuture<'a> {
1992 Box::pin(async move {
1993 let engine = InMemoryGroupEngine {
1994 cold_store: self.cold_store.clone(),
1995 ..InMemoryGroupEngine::default()
1996 };
1997 let engine: Box<dyn GroupEngine> = Box::new(engine);
1998 Ok(engine)
1999 })
2000 }
2001}
2002
2003pub(crate) fn compare_stream_ids(
2004 left: &BucketStreamId,
2005 right: &BucketStreamId,
2006) -> std::cmp::Ordering {
2007 left.bucket_id
2008 .cmp(&right.bucket_id)
2009 .then_with(|| left.stream_id.cmp(&right.stream_id))
2010}
2011pub(crate) fn ensure_bucket_exists(
2012 state_machine: &mut StreamStateMachine,
2013 stream_id: &BucketStreamId,
2014) -> Result<(), GroupEngineError> {
2015 if state_machine.bucket_exists(&stream_id.bucket_id) {
2016 return Ok(());
2017 }
2018
2019 match state_machine.apply(StreamCommand::CreateBucket {
2020 bucket_id: stream_id.bucket_id.clone(),
2021 }) {
2022 StreamResponse::BucketCreated { .. } | StreamResponse::BucketAlreadyExists { .. } => Ok(()),
2023 StreamResponse::Error {
2024 code,
2025 message,
2026 next_offset,
2027 } => Err(GroupEngineError::stream_with_next_offset(
2028 code,
2029 message,
2030 next_offset,
2031 )),
2032 other => Err(GroupEngineError::new(format!(
2033 "unexpected create bucket response: {other:?}"
2034 ))),
2035 }
2036}
2037
2038pub(crate) fn stream_response_error(response: StreamResponse) -> GroupEngineError {
2039 match response {
2040 StreamResponse::Error {
2041 code,
2042 message,
2043 next_offset,
2044 } => GroupEngineError::stream_with_next_offset(code, message, next_offset),
2045 other => GroupEngineError::new(format!("unexpected stream response error: {other:?}")),
2046 }
2047}
2048
2049pub(crate) fn restore_stream_append_counts(
2050 counts: Vec<StreamAppendCount>,
2051 snapshot_stream_ids: &HashSet<BucketStreamId>,
2052) -> Result<HashMap<BucketStreamId, u64>, GroupEngineError> {
2053 let mut restored = HashMap::with_capacity(counts.len());
2054 for count in counts {
2055 if !snapshot_stream_ids.contains(&count.stream_id) {
2056 return Err(GroupEngineError::new(format!(
2057 "append count references missing snapshot stream '{}'",
2058 count.stream_id
2059 )));
2060 }
2061 if restored
2062 .insert(count.stream_id.clone(), count.append_count)
2063 .is_some()
2064 {
2065 return Err(GroupEngineError::new(format!(
2066 "snapshot contains duplicate append count for stream '{}'",
2067 count.stream_id
2068 )));
2069 }
2070 }
2071 Ok(restored)
2072}