Skip to main content

ursula_runtime/
engine_in_memory.rs

1use std::collections::{HashMap, HashSet};
2
3use bytes::Bytes;
4use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardId, ShardPlacement};
5use ursula_stream::{
6    AppendStreamInput, ProducerRequest, StreamCommand, StreamErrorCode, StreamMessageRecord,
7    StreamReadPlan, StreamReadSegment, StreamResponse, StreamSnapshot, StreamStateMachine,
8};
9
10use crate::cold_store::{ColdStoreHandle, DEFAULT_CONTENT_TYPE};
11use crate::command::{GroupSnapshot, GroupWriteCommand};
12use crate::engine::{
13    GroupAppendBatchFuture, GroupAppendBatchResponse, GroupAppendFuture,
14    GroupBootstrapStreamFuture, GroupCloseStreamFuture, GroupColdHotBacklogFuture,
15    GroupCreateStreamFuture, GroupDeleteSnapshotFuture, GroupDeleteStreamFuture, GroupEngine,
16    GroupEngineCreateFuture, GroupEngineError, GroupEngineFactory, GroupEngineMetrics,
17    GroupFlushColdFuture, GroupForkRefFuture, GroupHeadStreamFuture, GroupInstallSnapshotFuture,
18    GroupPlanColdFlushFuture, GroupPlanNextColdFlushBatchFuture, GroupPlanNextColdFlushFuture,
19    GroupPublishSnapshotFuture, GroupReadSnapshotFuture, GroupReadStreamFuture,
20    GroupReadStreamPartsFuture, GroupSnapshotFuture, GroupTouchStreamAccessFuture,
21    GroupWriteResponse,
22};
23use crate::request::{
24    AppendBatchRequest, AppendExternalRequest, AppendRequest, AppendResponse,
25    BootstrapStreamRequest, BootstrapStreamResponse, BootstrapUpdate, CloseStreamRequest,
26    CloseStreamResponse, ColdHotBacklog, ColdWriteAdmission, CreateStreamExternalRequest,
27    CreateStreamRequest, CreateStreamResponse, DeleteSnapshotRequest, DeleteStreamRequest,
28    DeleteStreamResponse, FlushColdRequest, FlushColdResponse, ForkRefResponse,
29    GroupReadStreamParts, HeadStreamRequest, HeadStreamResponse, PlanColdFlushRequest,
30    PlanGroupColdFlushRequest, PublishSnapshotRequest, PublishSnapshotResponse,
31    ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest, StreamAppendCount,
32    TouchStreamAccessResponse,
33};
34
35pub(crate) struct AppendPayloadInput<'a> {
36    stream_id: BucketStreamId,
37    content_type: Option<&'a str>,
38    payload: &'a [u8],
39    close_after: bool,
40    stream_seq: Option<String>,
41    producer: Option<ProducerRequest>,
42    now_ms: u64,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct InMemoryGroupEngine {
47    pub(crate) commit_index: u64,
48    pub(crate) state_machine: StreamStateMachine,
49    pub(crate) stream_append_counts: HashMap<BucketStreamId, u64>,
50    pub(crate) cold_store: Option<ColdStoreHandle>,
51}
52
53impl InMemoryGroupEngine {
54    pub fn with_cold_store(cold_store: ColdStoreHandle) -> Self {
55        Self {
56            cold_store: Some(cold_store),
57            ..Self::default()
58        }
59    }
60
61    pub fn cold_store(&self) -> Option<ColdStoreHandle> {
62        self.cold_store.clone()
63    }
64
65    pub fn apply_committed_write(
66        &mut self,
67        command: GroupWriteCommand,
68        placement: ShardPlacement,
69    ) -> Result<GroupWriteResponse, GroupEngineError> {
70        match command {
71            GroupWriteCommand::CreateStream {
72                stream_id,
73                content_type,
74                initial_payload,
75                close_after,
76                stream_seq,
77                producer,
78                stream_ttl_seconds,
79                stream_expires_at_ms,
80                forked_from,
81                fork_offset,
82                now_ms,
83            } => {
84                ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
85                let response = self.state_machine.apply(StreamCommand::CreateStream {
86                    stream_id,
87                    content_type,
88                    initial_payload: initial_payload.to_vec(),
89                    close_after,
90                    stream_seq,
91                    producer,
92                    stream_ttl_seconds,
93                    stream_expires_at_ms,
94                    forked_from,
95                    fork_offset,
96                    now_ms,
97                });
98                match response {
99                    StreamResponse::Created {
100                        next_offset,
101                        closed,
102                        ..
103                    } => {
104                        self.commit_index += 1;
105                        Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
106                            placement,
107                            next_offset,
108                            closed,
109                            already_exists: false,
110                            group_commit_index: self.commit_index,
111                        }))
112                    }
113                    StreamResponse::AlreadyExists {
114                        next_offset,
115                        closed,
116                        ..
117                    } => Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
118                        placement,
119                        next_offset,
120                        closed,
121                        already_exists: true,
122                        group_commit_index: self.commit_index,
123                    })),
124                    StreamResponse::Error {
125                        code,
126                        message,
127                        next_offset,
128                    } => Err(GroupEngineError::stream_with_next_offset(
129                        code,
130                        message,
131                        next_offset,
132                    )),
133                    other => Err(GroupEngineError::new(format!(
134                        "unexpected create stream response: {other:?}"
135                    ))),
136                }
137            }
138            GroupWriteCommand::CreateExternal {
139                stream_id,
140                content_type,
141                initial_payload,
142                close_after,
143                stream_seq,
144                producer,
145                stream_ttl_seconds,
146                stream_expires_at_ms,
147                forked_from,
148                fork_offset,
149                now_ms,
150            } => {
151                ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
152                let response = self.state_machine.apply(StreamCommand::CreateExternal {
153                    stream_id,
154                    content_type,
155                    initial_payload,
156                    close_after,
157                    stream_seq,
158                    producer,
159                    stream_ttl_seconds,
160                    stream_expires_at_ms,
161                    forked_from,
162                    fork_offset,
163                    now_ms,
164                });
165                match response {
166                    StreamResponse::Created {
167                        next_offset,
168                        closed,
169                        ..
170                    } => {
171                        self.commit_index += 1;
172                        Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
173                            placement,
174                            next_offset,
175                            closed,
176                            already_exists: false,
177                            group_commit_index: self.commit_index,
178                        }))
179                    }
180                    StreamResponse::AlreadyExists {
181                        next_offset,
182                        closed,
183                        ..
184                    } => Ok(GroupWriteResponse::CreateStream(CreateStreamResponse {
185                        placement,
186                        next_offset,
187                        closed,
188                        already_exists: true,
189                        group_commit_index: self.commit_index,
190                    })),
191                    StreamResponse::Error {
192                        code,
193                        message,
194                        next_offset,
195                    } => Err(GroupEngineError::stream_with_next_offset(
196                        code,
197                        message,
198                        next_offset,
199                    )),
200                    other => Err(GroupEngineError::new(format!(
201                        "unexpected create external stream response: {other:?}"
202                    ))),
203                }
204            }
205            GroupWriteCommand::Append {
206                stream_id,
207                content_type,
208                payload,
209                close_after,
210                stream_seq,
211                producer,
212                now_ms,
213            } => self
214                .append_payload(
215                    AppendPayloadInput {
216                        stream_id,
217                        content_type: Some(&content_type),
218                        payload: &payload,
219                        close_after,
220                        stream_seq,
221                        producer,
222                        now_ms,
223                    },
224                    placement,
225                )
226                .map(GroupWriteResponse::Append),
227            GroupWriteCommand::AppendExternal {
228                stream_id,
229                content_type,
230                payload,
231                close_after,
232                stream_seq,
233                producer,
234                now_ms,
235            } => {
236                let response = self.state_machine.apply(StreamCommand::AppendExternal {
237                    stream_id: stream_id.clone(),
238                    content_type: Some(content_type),
239                    payload,
240                    close_after,
241                    stream_seq,
242                    producer,
243                    now_ms,
244                });
245                match response {
246                    StreamResponse::Appended {
247                        offset,
248                        next_offset,
249                        closed,
250                        deduplicated,
251                        producer,
252                        ..
253                    } => {
254                        let stream_append_count =
255                            self.stream_append_counts.entry(stream_id).or_insert(0);
256                        if !deduplicated {
257                            self.commit_index += 1;
258                            *stream_append_count += 1;
259                        }
260                        Ok(GroupWriteResponse::Append(AppendResponse {
261                            placement,
262                            start_offset: offset,
263                            next_offset,
264                            stream_append_count: *stream_append_count,
265                            group_commit_index: self.commit_index,
266                            closed,
267                            deduplicated,
268                            producer,
269                        }))
270                    }
271                    StreamResponse::Error {
272                        code,
273                        message,
274                        next_offset,
275                    } => Err(GroupEngineError::stream_with_next_offset(
276                        code,
277                        message,
278                        next_offset,
279                    )),
280                    other => Err(GroupEngineError::new(format!(
281                        "unexpected append external response: {other:?}"
282                    ))),
283                }
284            }
285            GroupWriteCommand::AppendBatch {
286                stream_id,
287                content_type,
288                payloads,
289                producer,
290                now_ms,
291            } => {
292                if producer.is_some() {
293                    let payload_refs = payloads.iter().map(Bytes::as_ref).collect::<Vec<_>>();
294                    let batch = self
295                        .state_machine
296                        .append_batch_borrowed(
297                            stream_id.clone(),
298                            Some(&content_type),
299                            &payload_refs,
300                            producer,
301                            now_ms,
302                        )
303                        .map_err(stream_response_error)?;
304                    let old_commit_index = self.commit_index;
305                    let old_append_count = *self.stream_append_counts.get(&stream_id).unwrap_or(&0);
306                    if !batch.deduplicated {
307                        let count = u64::try_from(batch.items.len()).expect("item count fits u64");
308                        self.commit_index += count;
309                        *self.stream_append_counts.entry(stream_id).or_insert(0) += count;
310                    }
311                    let items = batch
312                        .items
313                        .into_iter()
314                        .enumerate()
315                        .map(|(index, item)| {
316                            let item_index = u64::try_from(index + 1).expect("item index fits u64");
317                            Ok(AppendResponse {
318                                placement,
319                                start_offset: item.offset,
320                                next_offset: item.next_offset,
321                                stream_append_count: if item.deduplicated {
322                                    old_append_count
323                                } else {
324                                    old_append_count + item_index
325                                },
326                                group_commit_index: if item.deduplicated {
327                                    old_commit_index
328                                } else {
329                                    old_commit_index + item_index
330                                },
331                                closed: item.closed,
332                                deduplicated: item.deduplicated,
333                                producer: None,
334                            })
335                        })
336                        .collect();
337                    return Ok(GroupWriteResponse::AppendBatch(GroupAppendBatchResponse {
338                        placement,
339                        items,
340                    }));
341                }
342
343                let mut items = Vec::with_capacity(payloads.len());
344                for payload in payloads {
345                    if payload.is_empty() {
346                        items.push(Err(GroupEngineError::stream(
347                            StreamErrorCode::EmptyAppend,
348                            "append payload must be non-empty",
349                        )));
350                        continue;
351                    }
352                    items.push(self.append_payload(
353                        AppendPayloadInput {
354                            stream_id: stream_id.clone(),
355                            content_type: Some(&content_type),
356                            payload: &payload,
357                            close_after: false,
358                            stream_seq: None,
359                            producer: None,
360                            now_ms,
361                        },
362                        placement,
363                    ));
364                }
365                Ok(GroupWriteResponse::AppendBatch(GroupAppendBatchResponse {
366                    placement,
367                    items,
368                }))
369            }
370            GroupWriteCommand::PublishSnapshot {
371                stream_id,
372                snapshot_offset,
373                content_type,
374                payload,
375                now_ms,
376            } => {
377                let response = self.state_machine.apply(StreamCommand::PublishSnapshot {
378                    stream_id,
379                    snapshot_offset,
380                    content_type,
381                    payload: payload.to_vec(),
382                    now_ms,
383                });
384                match response {
385                    StreamResponse::SnapshotPublished { snapshot_offset } => {
386                        self.commit_index += 1;
387                        Ok(GroupWriteResponse::PublishSnapshot(
388                            PublishSnapshotResponse {
389                                placement,
390                                snapshot_offset,
391                                group_commit_index: self.commit_index,
392                            },
393                        ))
394                    }
395                    StreamResponse::Error {
396                        code,
397                        message,
398                        next_offset,
399                    } => Err(GroupEngineError::stream_with_next_offset(
400                        code,
401                        message,
402                        next_offset,
403                    )),
404                    other => Err(GroupEngineError::new(format!(
405                        "unexpected publish snapshot response: {other:?}"
406                    ))),
407                }
408            }
409            GroupWriteCommand::TouchStreamAccess {
410                stream_id,
411                now_ms,
412                renew_ttl,
413            } => {
414                let response = self.state_machine.apply(StreamCommand::TouchStreamAccess {
415                    stream_id,
416                    now_ms,
417                    renew_ttl,
418                });
419                match response {
420                    StreamResponse::Accessed { changed, expired } => {
421                        if changed || expired {
422                            self.commit_index += 1;
423                        }
424                        Ok(GroupWriteResponse::TouchStreamAccess(
425                            TouchStreamAccessResponse {
426                                placement,
427                                changed,
428                                expired,
429                                group_commit_index: self.commit_index,
430                            },
431                        ))
432                    }
433                    StreamResponse::Error {
434                        code,
435                        message,
436                        next_offset,
437                    } => Err(GroupEngineError::stream_with_next_offset(
438                        code,
439                        message,
440                        next_offset,
441                    )),
442                    other => Err(GroupEngineError::new(format!(
443                        "unexpected touch stream access response: {other:?}"
444                    ))),
445                }
446            }
447            GroupWriteCommand::AddForkRef { stream_id, now_ms } => {
448                let response = self
449                    .state_machine
450                    .apply(StreamCommand::AddForkRef { stream_id, now_ms });
451                match response {
452                    StreamResponse::ForkRefAdded { fork_ref_count } => {
453                        self.commit_index += 1;
454                        Ok(GroupWriteResponse::AddForkRef(ForkRefResponse {
455                            placement,
456                            fork_ref_count,
457                            hard_deleted: false,
458                            parent_to_release: None,
459                            group_commit_index: self.commit_index,
460                        }))
461                    }
462                    StreamResponse::Error {
463                        code,
464                        message,
465                        next_offset,
466                    } => Err(GroupEngineError::stream_with_next_offset(
467                        code,
468                        message,
469                        next_offset,
470                    )),
471                    other => Err(GroupEngineError::new(format!(
472                        "unexpected add fork ref response: {other:?}"
473                    ))),
474                }
475            }
476            GroupWriteCommand::ReleaseForkRef { stream_id } => {
477                let response = self
478                    .state_machine
479                    .apply(StreamCommand::ReleaseForkRef { stream_id });
480                match response {
481                    StreamResponse::ForkRefReleased {
482                        hard_deleted,
483                        fork_ref_count,
484                        parent_to_release,
485                    } => {
486                        self.commit_index += 1;
487                        Ok(GroupWriteResponse::ReleaseForkRef(ForkRefResponse {
488                            placement,
489                            fork_ref_count,
490                            hard_deleted,
491                            parent_to_release,
492                            group_commit_index: self.commit_index,
493                        }))
494                    }
495                    StreamResponse::Error {
496                        code,
497                        message,
498                        next_offset,
499                    } => Err(GroupEngineError::stream_with_next_offset(
500                        code,
501                        message,
502                        next_offset,
503                    )),
504                    other => Err(GroupEngineError::new(format!(
505                        "unexpected release fork ref response: {other:?}"
506                    ))),
507                }
508            }
509            GroupWriteCommand::FlushCold { stream_id, chunk } => {
510                let response = self
511                    .state_machine
512                    .apply(StreamCommand::FlushCold { stream_id, chunk });
513                match response {
514                    StreamResponse::ColdFlushed { hot_start_offset } => {
515                        self.commit_index += 1;
516                        Ok(GroupWriteResponse::FlushCold(FlushColdResponse {
517                            placement,
518                            hot_start_offset,
519                            group_commit_index: self.commit_index,
520                        }))
521                    }
522                    StreamResponse::Error {
523                        code,
524                        message,
525                        next_offset,
526                    } => Err(GroupEngineError::stream_with_next_offset(
527                        code,
528                        message,
529                        next_offset,
530                    )),
531                    other => Err(GroupEngineError::new(format!(
532                        "unexpected flush cold response: {other:?}"
533                    ))),
534                }
535            }
536            GroupWriteCommand::CloseStream {
537                stream_id,
538                stream_seq,
539                producer,
540                now_ms,
541            } => {
542                let response = self.state_machine.apply(StreamCommand::Close {
543                    stream_id,
544                    stream_seq,
545                    producer,
546                    now_ms,
547                });
548                match response {
549                    StreamResponse::Closed {
550                        next_offset,
551                        deduplicated,
552                        ..
553                    } => {
554                        if !deduplicated {
555                            self.commit_index += 1;
556                        }
557                        Ok(GroupWriteResponse::CloseStream(CloseStreamResponse {
558                            placement,
559                            next_offset,
560                            group_commit_index: self.commit_index,
561                            deduplicated,
562                        }))
563                    }
564                    StreamResponse::Error {
565                        code,
566                        message,
567                        next_offset,
568                    } => Err(GroupEngineError::stream_with_next_offset(
569                        code,
570                        message,
571                        next_offset,
572                    )),
573                    other => Err(GroupEngineError::new(format!(
574                        "unexpected close stream response: {other:?}"
575                    ))),
576                }
577            }
578            GroupWriteCommand::DeleteStream { stream_id } => {
579                let response = self
580                    .state_machine
581                    .apply(StreamCommand::DeleteStream { stream_id });
582                match response {
583                    StreamResponse::Deleted {
584                        hard_deleted,
585                        parent_to_release,
586                    } => {
587                        self.commit_index += 1;
588                        Ok(GroupWriteResponse::DeleteStream(DeleteStreamResponse {
589                            placement,
590                            group_commit_index: self.commit_index,
591                            hard_deleted,
592                            parent_to_release,
593                        }))
594                    }
595                    StreamResponse::Error {
596                        code,
597                        message,
598                        next_offset,
599                    } => Err(GroupEngineError::stream_with_next_offset(
600                        code,
601                        message,
602                        next_offset,
603                    )),
604                    other => Err(GroupEngineError::new(format!(
605                        "unexpected delete stream response: {other:?}"
606                    ))),
607                }
608            }
609            GroupWriteCommand::Batch { commands } => Ok(GroupWriteResponse::Batch(
610                self.apply_committed_write_batch(commands, placement),
611            )),
612        }
613    }
614
615    pub(crate) fn cold_hot_backlog_for(
616        &self,
617        stream_id: BucketStreamId,
618    ) -> Result<ColdHotBacklog, GroupEngineError> {
619        let stream_hot_bytes = self.state_machine.hot_payload_len(&stream_id).unwrap_or(0);
620        Ok(ColdHotBacklog {
621            stream_id,
622            stream_hot_bytes,
623            group_hot_bytes: self.state_machine.total_hot_payload_bytes(),
624        })
625    }
626
627    pub(crate) fn enforce_cold_write_admission(
628        &self,
629        stream_id: &BucketStreamId,
630        admission: ColdWriteAdmission,
631        before_group_hot_bytes: u64,
632        after_group_hot_bytes: u64,
633        mutating: bool,
634    ) -> Result<(), GroupEngineError> {
635        let Some(limit) = admission.max_hot_bytes_per_group else {
636            return Ok(());
637        };
638        if !mutating || after_group_hot_bytes <= limit {
639            return Ok(());
640        }
641        Err(GroupEngineError::new(format!(
642            "ColdBackpressure: stream '{stream_id}' would raise group hot bytes from {before_group_hot_bytes} to {after_group_hot_bytes}, above limit {limit}"
643        )))
644    }
645
646    pub(crate) fn create_stream_with_admission_inner(
647        &mut self,
648        request: CreateStreamRequest,
649        placement: ShardPlacement,
650        admission: ColdWriteAdmission,
651    ) -> Result<CreateStreamResponse, GroupEngineError> {
652        let stream_id = request.stream_id.clone();
653        let command = GroupWriteCommand::from(request);
654        let before = self.state_machine.total_hot_payload_bytes();
655        let mut preview = self.clone();
656        let response = match preview.apply_committed_write(command, placement)? {
657            GroupWriteResponse::CreateStream(response) => response,
658            other => {
659                return Err(GroupEngineError::new(format!(
660                    "unexpected create stream write response: {other:?}"
661                )));
662            }
663        };
664        preview.enforce_cold_write_admission(
665            &stream_id,
666            admission,
667            before,
668            preview.state_machine.total_hot_payload_bytes(),
669            !response.already_exists,
670        )?;
671        *self = preview;
672        Ok(response)
673    }
674
675    pub(crate) fn append_with_admission_inner(
676        &mut self,
677        request: AppendRequest,
678        placement: ShardPlacement,
679        admission: ColdWriteAdmission,
680    ) -> Result<AppendResponse, GroupEngineError> {
681        let stream_id = request.stream_id.clone();
682        let command = GroupWriteCommand::from(request);
683        let before = self.state_machine.total_hot_payload_bytes();
684        let mut preview = self.clone();
685        let response = match preview.apply_committed_write(command, placement)? {
686            GroupWriteResponse::Append(response) => response,
687            other => {
688                return Err(GroupEngineError::new(format!(
689                    "unexpected append write response: {other:?}"
690                )));
691            }
692        };
693        preview.enforce_cold_write_admission(
694            &stream_id,
695            admission,
696            before,
697            preview.state_machine.total_hot_payload_bytes(),
698            !response.deduplicated,
699        )?;
700        *self = preview;
701        Ok(response)
702    }
703
704    pub(crate) fn append_batch_with_admission_inner(
705        &mut self,
706        request: AppendBatchRequest,
707        placement: ShardPlacement,
708        admission: ColdWriteAdmission,
709    ) -> Result<GroupAppendBatchResponse, GroupEngineError> {
710        let stream_id = request.stream_id.clone();
711        let command = GroupWriteCommand::from(request);
712        let before = self.state_machine.total_hot_payload_bytes();
713        let mut preview = self.clone();
714        let response = match preview.apply_committed_write(command, placement)? {
715            GroupWriteResponse::AppendBatch(response) => response,
716            other => {
717                return Err(GroupEngineError::new(format!(
718                    "unexpected append batch write response: {other:?}"
719                )));
720            }
721        };
722        let mutating = response
723            .items
724            .iter()
725            .any(|item| matches!(item, Ok(response) if !response.deduplicated));
726        preview.enforce_cold_write_admission(
727            &stream_id,
728            admission,
729            before,
730            preview.state_machine.total_hot_payload_bytes(),
731            mutating,
732        )?;
733        *self = preview;
734        Ok(response)
735    }
736
737    pub fn access_requires_write(
738        &self,
739        stream_id: &BucketStreamId,
740        now_ms: u64,
741        renew_ttl: bool,
742    ) -> Result<bool, GroupEngineError> {
743        self.state_machine
744            .access_requires_write(stream_id, now_ms, renew_ttl)
745            .map_err(stream_response_error)
746    }
747
748    pub(crate) fn apply_access_command(
749        &mut self,
750        stream_id: BucketStreamId,
751        now_ms: u64,
752        renew_ttl: bool,
753        placement: ShardPlacement,
754    ) -> Result<TouchStreamAccessResponse, GroupEngineError> {
755        match self.apply_committed_write(
756            GroupWriteCommand::TouchStreamAccess {
757                stream_id,
758                now_ms,
759                renew_ttl,
760            },
761            placement,
762        )? {
763            GroupWriteResponse::TouchStreamAccess(response) => Ok(response),
764            other => Err(GroupEngineError::new(format!(
765                "unexpected touch stream access write response: {other:?}"
766            ))),
767        }
768    }
769
770    pub(crate) fn ensure_stream_access(
771        &mut self,
772        stream_id: &BucketStreamId,
773        now_ms: u64,
774        renew_ttl: bool,
775        placement: ShardPlacement,
776    ) -> Result<Option<TouchStreamAccessResponse>, GroupEngineError> {
777        if !self.access_requires_write(stream_id, now_ms, renew_ttl)? {
778            return Ok(None);
779        }
780        let response =
781            self.apply_access_command(stream_id.clone(), now_ms, renew_ttl, placement)?;
782        if response.expired {
783            return Err(GroupEngineError::stream(
784                StreamErrorCode::StreamNotFound,
785                format!("stream '{stream_id}' does not exist"),
786            ));
787        }
788        Ok(Some(response))
789    }
790
791    pub fn apply_committed_write_batch(
792        &mut self,
793        commands: Vec<GroupWriteCommand>,
794        placement: ShardPlacement,
795    ) -> Vec<Result<GroupWriteResponse, GroupEngineError>> {
796        commands
797            .into_iter()
798            .map(|command| self.apply_committed_write(command, placement))
799            .collect()
800    }
801
802    pub(crate) fn apply_replayed_write_command(
803        &mut self,
804        command: GroupWriteCommand,
805    ) -> Result<(), GroupEngineError> {
806        let placement = ShardPlacement {
807            core_id: CoreId(0),
808            shard_id: ShardId(0),
809            raft_group_id: RaftGroupId(0),
810        };
811        self.apply_committed_write(command, placement).map(|_| ())
812    }
813
814    pub(crate) fn apply_replayed_command(
815        &mut self,
816        command: StreamCommand,
817    ) -> Result<(), GroupEngineError> {
818        match command {
819            StreamCommand::CreateBucket { bucket_id } => {
820                match self
821                    .state_machine
822                    .apply(StreamCommand::CreateBucket { bucket_id })
823                {
824                    StreamResponse::BucketCreated { .. } => {
825                        self.commit_index += 1;
826                        Ok(())
827                    }
828                    StreamResponse::BucketAlreadyExists { .. } => Ok(()),
829                    StreamResponse::Error {
830                        code,
831                        message,
832                        next_offset,
833                    } => Err(GroupEngineError::stream_with_next_offset(
834                        code,
835                        message,
836                        next_offset,
837                    )),
838                    other => Err(GroupEngineError::new(format!(
839                        "unexpected replay create bucket response: {other:?}"
840                    ))),
841                }
842            }
843            StreamCommand::DeleteBucket { bucket_id } => {
844                match self
845                    .state_machine
846                    .apply(StreamCommand::DeleteBucket { bucket_id })
847                {
848                    StreamResponse::BucketDeleted { .. } => {
849                        self.commit_index += 1;
850                        Ok(())
851                    }
852                    StreamResponse::Error {
853                        code,
854                        message,
855                        next_offset,
856                    } => Err(GroupEngineError::stream_with_next_offset(
857                        code,
858                        message,
859                        next_offset,
860                    )),
861                    other => Err(GroupEngineError::new(format!(
862                        "unexpected replay delete bucket response: {other:?}"
863                    ))),
864                }
865            }
866            StreamCommand::CreateStream {
867                stream_id,
868                content_type,
869                initial_payload,
870                close_after,
871                stream_seq,
872                producer,
873                stream_ttl_seconds,
874                stream_expires_at_ms,
875                forked_from,
876                fork_offset,
877                now_ms,
878            } => {
879                ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
880                let response = self.state_machine.apply(StreamCommand::CreateStream {
881                    stream_id,
882                    content_type,
883                    initial_payload,
884                    close_after,
885                    stream_seq,
886                    producer,
887                    stream_ttl_seconds,
888                    stream_expires_at_ms,
889                    forked_from,
890                    fork_offset,
891                    now_ms,
892                });
893                match response {
894                    StreamResponse::Created { .. } => {
895                        self.commit_index += 1;
896                        Ok(())
897                    }
898                    StreamResponse::AlreadyExists { .. } => Ok(()),
899                    StreamResponse::Error {
900                        code,
901                        message,
902                        next_offset,
903                    } => Err(GroupEngineError::stream_with_next_offset(
904                        code,
905                        message,
906                        next_offset,
907                    )),
908                    other => Err(GroupEngineError::new(format!(
909                        "unexpected replay create stream response: {other:?}"
910                    ))),
911                }
912            }
913            StreamCommand::CreateExternal {
914                stream_id,
915                content_type,
916                initial_payload,
917                close_after,
918                stream_seq,
919                producer,
920                stream_ttl_seconds,
921                stream_expires_at_ms,
922                forked_from,
923                fork_offset,
924                now_ms,
925            } => {
926                ensure_bucket_exists(&mut self.state_machine, &stream_id)?;
927                let response = self.state_machine.apply(StreamCommand::CreateExternal {
928                    stream_id,
929                    content_type,
930                    initial_payload,
931                    close_after,
932                    stream_seq,
933                    producer,
934                    stream_ttl_seconds,
935                    stream_expires_at_ms,
936                    forked_from,
937                    fork_offset,
938                    now_ms,
939                });
940                match response {
941                    StreamResponse::Created { .. } => {
942                        self.commit_index += 1;
943                        Ok(())
944                    }
945                    StreamResponse::AlreadyExists { .. } => Ok(()),
946                    StreamResponse::Error {
947                        code,
948                        message,
949                        next_offset,
950                    } => Err(GroupEngineError::stream_with_next_offset(
951                        code,
952                        message,
953                        next_offset,
954                    )),
955                    other => Err(GroupEngineError::new(format!(
956                        "unexpected replay external create stream response: {other:?}"
957                    ))),
958                }
959            }
960            StreamCommand::Append {
961                stream_id,
962                content_type,
963                payload,
964                close_after,
965                stream_seq,
966                producer,
967                now_ms,
968            } => {
969                let stream_count_key = stream_id.clone();
970                let response = self.state_machine.apply(StreamCommand::Append {
971                    stream_id,
972                    content_type,
973                    payload,
974                    close_after,
975                    stream_seq,
976                    producer,
977                    now_ms,
978                });
979                match response {
980                    StreamResponse::Appended { deduplicated, .. } => {
981                        if !deduplicated {
982                            self.commit_index += 1;
983                            *self
984                                .stream_append_counts
985                                .entry(stream_count_key)
986                                .or_insert(0) += 1;
987                        }
988                        Ok(())
989                    }
990                    StreamResponse::Closed { deduplicated, .. } => {
991                        if !deduplicated {
992                            self.commit_index += 1;
993                        }
994                        Ok(())
995                    }
996                    StreamResponse::Error {
997                        code,
998                        message,
999                        next_offset,
1000                    } => Err(GroupEngineError::stream_with_next_offset(
1001                        code,
1002                        message,
1003                        next_offset,
1004                    )),
1005                    other => Err(GroupEngineError::new(format!(
1006                        "unexpected replay append response: {other:?}"
1007                    ))),
1008                }
1009            }
1010            StreamCommand::AppendExternal {
1011                stream_id,
1012                content_type,
1013                payload,
1014                close_after,
1015                stream_seq,
1016                producer,
1017                now_ms,
1018            } => {
1019                let stream_count_key = stream_id.clone();
1020                let response = self.state_machine.apply(StreamCommand::AppendExternal {
1021                    stream_id,
1022                    content_type,
1023                    payload,
1024                    close_after,
1025                    stream_seq,
1026                    producer,
1027                    now_ms,
1028                });
1029                match response {
1030                    StreamResponse::Appended { deduplicated, .. } => {
1031                        if !deduplicated {
1032                            self.commit_index += 1;
1033                            *self
1034                                .stream_append_counts
1035                                .entry(stream_count_key)
1036                                .or_insert(0) += 1;
1037                        }
1038                        Ok(())
1039                    }
1040                    StreamResponse::Error {
1041                        code,
1042                        message,
1043                        next_offset,
1044                    } => Err(GroupEngineError::stream_with_next_offset(
1045                        code,
1046                        message,
1047                        next_offset,
1048                    )),
1049                    other => Err(GroupEngineError::new(format!(
1050                        "unexpected replay external append response: {other:?}"
1051                    ))),
1052                }
1053            }
1054            StreamCommand::AppendBatch {
1055                stream_id,
1056                content_type,
1057                payloads,
1058                producer,
1059                now_ms,
1060            } => {
1061                let stream_count_key = stream_id.clone();
1062                let payload_refs = payloads.iter().map(Vec::as_slice).collect::<Vec<_>>();
1063                let response = self
1064                    .state_machine
1065                    .append_batch_borrowed(
1066                        stream_id,
1067                        content_type.as_deref(),
1068                        &payload_refs,
1069                        producer,
1070                        now_ms,
1071                    )
1072                    .map_err(stream_response_error)?;
1073                if !response.deduplicated {
1074                    let count = u64::try_from(response.items.len()).expect("item count fits u64");
1075                    self.commit_index += count;
1076                    *self
1077                        .stream_append_counts
1078                        .entry(stream_count_key)
1079                        .or_insert(0) += count;
1080                }
1081                Ok(())
1082            }
1083            StreamCommand::PublishSnapshot {
1084                stream_id,
1085                snapshot_offset,
1086                content_type,
1087                payload,
1088                now_ms,
1089            } => {
1090                let response = self.state_machine.apply(StreamCommand::PublishSnapshot {
1091                    stream_id,
1092                    snapshot_offset,
1093                    content_type,
1094                    payload,
1095                    now_ms,
1096                });
1097                match response {
1098                    StreamResponse::SnapshotPublished { .. } => {
1099                        self.commit_index += 1;
1100                        Ok(())
1101                    }
1102                    StreamResponse::Error {
1103                        code,
1104                        message,
1105                        next_offset,
1106                    } => Err(GroupEngineError::stream_with_next_offset(
1107                        code,
1108                        message,
1109                        next_offset,
1110                    )),
1111                    other => Err(GroupEngineError::new(format!(
1112                        "unexpected replay publish snapshot response: {other:?}"
1113                    ))),
1114                }
1115            }
1116            StreamCommand::TouchStreamAccess {
1117                stream_id,
1118                now_ms,
1119                renew_ttl,
1120            } => {
1121                let response = self.state_machine.apply(StreamCommand::TouchStreamAccess {
1122                    stream_id,
1123                    now_ms,
1124                    renew_ttl,
1125                });
1126                match response {
1127                    StreamResponse::Accessed { changed, expired } => {
1128                        if changed || expired {
1129                            self.commit_index += 1;
1130                        }
1131                        Ok(())
1132                    }
1133                    StreamResponse::Error {
1134                        code,
1135                        message,
1136                        next_offset,
1137                    } => Err(GroupEngineError::stream_with_next_offset(
1138                        code,
1139                        message,
1140                        next_offset,
1141                    )),
1142                    other => Err(GroupEngineError::new(format!(
1143                        "unexpected replay touch stream access response: {other:?}"
1144                    ))),
1145                }
1146            }
1147            StreamCommand::AddForkRef { stream_id, now_ms } => {
1148                let response = self
1149                    .state_machine
1150                    .apply(StreamCommand::AddForkRef { stream_id, now_ms });
1151                match response {
1152                    StreamResponse::ForkRefAdded { .. } => {
1153                        self.commit_index += 1;
1154                        Ok(())
1155                    }
1156                    StreamResponse::Error {
1157                        code,
1158                        message,
1159                        next_offset,
1160                    } => Err(GroupEngineError::stream_with_next_offset(
1161                        code,
1162                        message,
1163                        next_offset,
1164                    )),
1165                    other => Err(GroupEngineError::new(format!(
1166                        "unexpected replay add fork ref response: {other:?}"
1167                    ))),
1168                }
1169            }
1170            StreamCommand::ReleaseForkRef { stream_id } => {
1171                let response = self
1172                    .state_machine
1173                    .apply(StreamCommand::ReleaseForkRef { stream_id });
1174                match response {
1175                    StreamResponse::ForkRefReleased { .. } => {
1176                        self.commit_index += 1;
1177                        Ok(())
1178                    }
1179                    StreamResponse::Error {
1180                        code,
1181                        message,
1182                        next_offset,
1183                    } => Err(GroupEngineError::stream_with_next_offset(
1184                        code,
1185                        message,
1186                        next_offset,
1187                    )),
1188                    other => Err(GroupEngineError::new(format!(
1189                        "unexpected replay release fork ref response: {other:?}"
1190                    ))),
1191                }
1192            }
1193            StreamCommand::FlushCold { stream_id, chunk } => {
1194                let response = self
1195                    .state_machine
1196                    .apply(StreamCommand::FlushCold { stream_id, chunk });
1197                match response {
1198                    StreamResponse::ColdFlushed { .. } => {
1199                        self.commit_index += 1;
1200                        Ok(())
1201                    }
1202                    StreamResponse::Error {
1203                        code,
1204                        message,
1205                        next_offset,
1206                    } => Err(GroupEngineError::stream_with_next_offset(
1207                        code,
1208                        message,
1209                        next_offset,
1210                    )),
1211                    other => Err(GroupEngineError::new(format!(
1212                        "unexpected replay flush cold response: {other:?}"
1213                    ))),
1214                }
1215            }
1216            StreamCommand::Close {
1217                stream_id,
1218                stream_seq,
1219                producer,
1220                now_ms,
1221            } => {
1222                let response = self.state_machine.apply(StreamCommand::Close {
1223                    stream_id,
1224                    stream_seq,
1225                    producer,
1226                    now_ms,
1227                });
1228                match response {
1229                    StreamResponse::Closed { deduplicated, .. } => {
1230                        if !deduplicated {
1231                            self.commit_index += 1;
1232                        }
1233                        Ok(())
1234                    }
1235                    StreamResponse::Error {
1236                        code,
1237                        message,
1238                        next_offset,
1239                    } => Err(GroupEngineError::stream_with_next_offset(
1240                        code,
1241                        message,
1242                        next_offset,
1243                    )),
1244                    other => Err(GroupEngineError::new(format!(
1245                        "unexpected replay close stream response: {other:?}"
1246                    ))),
1247                }
1248            }
1249            StreamCommand::DeleteStream { stream_id } => {
1250                let response = self
1251                    .state_machine
1252                    .apply(StreamCommand::DeleteStream { stream_id });
1253                match response {
1254                    StreamResponse::Deleted { .. } => {
1255                        self.commit_index += 1;
1256                        Ok(())
1257                    }
1258                    StreamResponse::Error {
1259                        code,
1260                        message,
1261                        next_offset,
1262                    } => Err(GroupEngineError::stream_with_next_offset(
1263                        code,
1264                        message,
1265                        next_offset,
1266                    )),
1267                    other => Err(GroupEngineError::new(format!(
1268                        "unexpected replay delete stream response: {other:?}"
1269                    ))),
1270                }
1271            }
1272        }
1273    }
1274
1275    pub(crate) fn append_payload(
1276        &mut self,
1277        input: AppendPayloadInput<'_>,
1278        placement: ShardPlacement,
1279    ) -> Result<AppendResponse, GroupEngineError> {
1280        let AppendPayloadInput {
1281            stream_id,
1282            content_type,
1283            payload,
1284            close_after,
1285            stream_seq,
1286            producer,
1287            now_ms,
1288        } = input;
1289        let stream_count_key = stream_id.clone();
1290        let response = self.state_machine.append_borrowed(AppendStreamInput {
1291            stream_id,
1292            content_type,
1293            payload,
1294            close_after,
1295            stream_seq,
1296            producer,
1297            now_ms,
1298        });
1299        match response {
1300            StreamResponse::Appended {
1301                offset,
1302                next_offset,
1303                closed,
1304                deduplicated,
1305                producer,
1306                ..
1307            } => {
1308                let stream_append_count = self
1309                    .stream_append_counts
1310                    .entry(stream_count_key)
1311                    .or_insert(0);
1312                if !deduplicated {
1313                    self.commit_index += 1;
1314                    *stream_append_count += 1;
1315                }
1316                Ok(AppendResponse {
1317                    placement,
1318                    start_offset: offset,
1319                    next_offset,
1320                    stream_append_count: *stream_append_count,
1321                    group_commit_index: self.commit_index,
1322                    closed,
1323                    deduplicated,
1324                    producer,
1325                })
1326            }
1327            StreamResponse::Error {
1328                code,
1329                message,
1330                next_offset,
1331            } => Err(GroupEngineError::stream_with_next_offset(
1332                code,
1333                message,
1334                next_offset,
1335            )),
1336            other => Err(GroupEngineError::new(format!(
1337                "unexpected append response: {other:?}"
1338            ))),
1339        }
1340    }
1341
1342    pub fn read_stream_plan(
1343        &mut self,
1344        request: &ReadStreamRequest,
1345        placement: ShardPlacement,
1346    ) -> Result<StreamReadPlan, GroupEngineError> {
1347        self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1348        self.read_stream_plan_after_access(request)
1349    }
1350
1351    pub fn read_stream_plan_after_access(
1352        &self,
1353        request: &ReadStreamRequest,
1354    ) -> Result<StreamReadPlan, GroupEngineError> {
1355        self.state_machine
1356            .read_plan_at(
1357                &request.stream_id,
1358                request.offset,
1359                request.max_len,
1360                request.now_ms,
1361            )
1362            .map_err(stream_response_error)
1363    }
1364
1365    pub fn head_stream_after_access(
1366        &mut self,
1367        request: &HeadStreamRequest,
1368        placement: ShardPlacement,
1369    ) -> Result<HeadStreamResponse, GroupEngineError> {
1370        let Some(metadata) = self
1371            .state_machine
1372            .head_at(&request.stream_id, request.now_ms)
1373        else {
1374            return Err(GroupEngineError::stream(
1375                StreamErrorCode::StreamNotFound,
1376                format!("stream '{}' does not exist", request.stream_id),
1377            ));
1378        };
1379        Ok(HeadStreamResponse {
1380            placement,
1381            content_type: metadata.content_type.clone(),
1382            tail_offset: metadata.tail_offset,
1383            closed: metadata.status == ursula_stream::StreamStatus::Closed,
1384            stream_ttl_seconds: metadata.stream_ttl_seconds,
1385            stream_expires_at_ms: metadata.stream_expires_at_ms,
1386            snapshot_offset: self
1387                .state_machine
1388                .latest_snapshot(&request.stream_id)
1389                .map_err(stream_response_error)?
1390                .map(|snapshot| snapshot.offset),
1391        })
1392    }
1393
1394    pub async fn read_payload_from_plan(
1395        cold_store: Option<&ColdStoreHandle>,
1396        stream_id: &BucketStreamId,
1397        plan: &StreamReadPlan,
1398    ) -> Result<Vec<u8>, GroupEngineError> {
1399        let mut payload = Vec::new();
1400        for segment in &plan.segments {
1401            match segment {
1402                StreamReadSegment::Hot(bytes) => payload.extend_from_slice(bytes),
1403                StreamReadSegment::Object(segment) => {
1404                    let Some(cold_store) = cold_store else {
1405                        return Err(GroupEngineError::stream_with_next_offset(
1406                            StreamErrorCode::InvalidColdFlush,
1407                            format!("stream '{stream_id}' read requires object payload store"),
1408                            Some(plan.next_offset),
1409                        ));
1410                    };
1411                    let bytes = cold_store
1412                        .read_object_range(&segment.object, segment.read_start_offset, segment.len)
1413                        .await
1414                        .map_err(|err| GroupEngineError::new(err.to_string()))?;
1415                    payload.extend_from_slice(&bytes);
1416                }
1417            }
1418        }
1419        Ok(payload)
1420    }
1421
1422    pub(crate) async fn read_own_payload_from_plan(
1423        &self,
1424        stream_id: &BucketStreamId,
1425        plan: &StreamReadPlan,
1426    ) -> Result<Vec<u8>, GroupEngineError> {
1427        Self::read_payload_from_plan(self.cold_store.as_ref(), stream_id, plan).await
1428    }
1429
1430    pub(crate) async fn bootstrap_updates(
1431        &self,
1432        stream_id: &BucketStreamId,
1433        records: &[StreamMessageRecord],
1434        content_type: &str,
1435        now_ms: u64,
1436    ) -> Result<Vec<BootstrapUpdate>, GroupEngineError> {
1437        let mut updates = Vec::with_capacity(records.len());
1438        for record in records {
1439            let len = usize::try_from(record.end_offset - record.start_offset).map_err(|_| {
1440                GroupEngineError::stream(
1441                    StreamErrorCode::InvalidSnapshot,
1442                    format!(
1443                        "bootstrap message [{}..{}) for stream '{stream_id}' is too large",
1444                        record.start_offset, record.end_offset
1445                    ),
1446                )
1447            })?;
1448            let plan = self
1449                .state_machine
1450                .read_plan_at(stream_id, record.start_offset, len, now_ms)
1451                .map_err(stream_response_error)?;
1452            let payload = self.read_own_payload_from_plan(stream_id, &plan).await?;
1453            updates.push(BootstrapUpdate {
1454                start_offset: record.start_offset,
1455                next_offset: record.end_offset,
1456                content_type: content_type.to_owned(),
1457                payload,
1458            });
1459        }
1460        Ok(updates)
1461    }
1462
1463    pub(crate) fn build_snapshot(&self, placement: ShardPlacement) -> GroupSnapshot {
1464        GroupSnapshot {
1465            placement,
1466            group_commit_index: self.commit_index,
1467            stream_snapshot: self.state_machine.snapshot(),
1468            stream_append_counts: self.stream_append_counts_snapshot(),
1469        }
1470    }
1471
1472    pub(crate) fn stream_append_counts_snapshot(&self) -> Vec<StreamAppendCount> {
1473        let mut counts = self
1474            .stream_append_counts
1475            .iter()
1476            .map(|(stream_id, append_count)| StreamAppendCount {
1477                stream_id: stream_id.clone(),
1478                append_count: *append_count,
1479            })
1480            .collect::<Vec<_>>();
1481        counts.sort_by(|left, right| compare_stream_ids(&left.stream_id, &right.stream_id));
1482        counts
1483    }
1484
1485    pub(crate) fn install_snapshot_inner(
1486        &mut self,
1487        snapshot: GroupSnapshot,
1488    ) -> Result<(), GroupEngineError> {
1489        let GroupSnapshot {
1490            placement: _,
1491            group_commit_index,
1492            stream_snapshot,
1493            stream_append_counts,
1494        } = snapshot;
1495        self.install_snapshot_parts(group_commit_index, stream_snapshot, stream_append_counts)
1496    }
1497
1498    pub(crate) fn install_snapshot_parts(
1499        &mut self,
1500        group_commit_index: u64,
1501        stream_snapshot: StreamSnapshot,
1502        stream_append_counts: Vec<StreamAppendCount>,
1503    ) -> Result<(), GroupEngineError> {
1504        let stream_ids = stream_snapshot
1505            .streams
1506            .iter()
1507            .map(|entry| entry.metadata.stream_id.clone())
1508            .collect::<HashSet<_>>();
1509        let state_machine = StreamStateMachine::restore(stream_snapshot)
1510            .map_err(|err| GroupEngineError::new(format!("restore stream snapshot: {err}")))?;
1511        let stream_append_counts = restore_stream_append_counts(stream_append_counts, &stream_ids)?;
1512
1513        self.commit_index = group_commit_index;
1514        self.state_machine = state_machine;
1515        self.stream_append_counts = stream_append_counts;
1516        Ok(())
1517    }
1518}
1519
1520impl GroupEngine for InMemoryGroupEngine {
1521    fn create_stream<'a>(
1522        &'a mut self,
1523        request: CreateStreamRequest,
1524        placement: ShardPlacement,
1525    ) -> GroupCreateStreamFuture<'a> {
1526        let command = GroupWriteCommand::from(request);
1527        Box::pin(async move {
1528            match self.apply_committed_write(command, placement)? {
1529                GroupWriteResponse::CreateStream(response) => Ok(response),
1530                other => Err(GroupEngineError::new(format!(
1531                    "unexpected create stream write response: {other:?}"
1532                ))),
1533            }
1534        })
1535    }
1536
1537    fn create_stream_with_cold_admission<'a>(
1538        &'a mut self,
1539        request: CreateStreamRequest,
1540        placement: ShardPlacement,
1541        admission: ColdWriteAdmission,
1542    ) -> GroupCreateStreamFuture<'a> {
1543        if !admission.is_enabled() {
1544            return self.create_stream(request, placement);
1545        }
1546        Box::pin(
1547            async move { self.create_stream_with_admission_inner(request, placement, admission) },
1548        )
1549    }
1550
1551    fn create_stream_external<'a>(
1552        &'a mut self,
1553        request: CreateStreamExternalRequest,
1554        placement: ShardPlacement,
1555    ) -> GroupCreateStreamFuture<'a> {
1556        let command = GroupWriteCommand::from(request);
1557        Box::pin(async move {
1558            match self.apply_committed_write(command, placement)? {
1559                GroupWriteResponse::CreateStream(response) => Ok(response),
1560                other => Err(GroupEngineError::new(format!(
1561                    "unexpected external create stream write response: {other:?}"
1562                ))),
1563            }
1564        })
1565    }
1566
1567    fn read_stream<'a>(
1568        &'a mut self,
1569        request: ReadStreamRequest,
1570        placement: ShardPlacement,
1571    ) -> GroupReadStreamFuture<'a> {
1572        Box::pin(async move {
1573            self.read_stream_parts(request, placement)
1574                .await?
1575                .into_response()
1576                .await
1577        })
1578    }
1579
1580    fn read_stream_parts<'a>(
1581        &'a mut self,
1582        request: ReadStreamRequest,
1583        placement: ShardPlacement,
1584    ) -> GroupReadStreamPartsFuture<'a> {
1585        Box::pin(async move {
1586            let stream_id = request.stream_id.clone();
1587            let plan = self.read_stream_plan(&request, placement)?;
1588            Ok(GroupReadStreamParts::from_plan(
1589                placement,
1590                stream_id,
1591                plan,
1592                self.cold_store(),
1593            ))
1594        })
1595    }
1596
1597    fn publish_snapshot<'a>(
1598        &'a mut self,
1599        request: PublishSnapshotRequest,
1600        placement: ShardPlacement,
1601    ) -> GroupPublishSnapshotFuture<'a> {
1602        Box::pin(async move {
1603            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1604            let command = GroupWriteCommand::from(request);
1605            match self.apply_committed_write(command, placement)? {
1606                GroupWriteResponse::PublishSnapshot(response) => Ok(response),
1607                other => Err(GroupEngineError::new(format!(
1608                    "unexpected publish snapshot write response: {other:?}"
1609                ))),
1610            }
1611        })
1612    }
1613
1614    fn read_snapshot<'a>(
1615        &'a mut self,
1616        request: ReadSnapshotRequest,
1617        placement: ShardPlacement,
1618    ) -> GroupReadSnapshotFuture<'a> {
1619        Box::pin(async move {
1620            self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1621            let snapshot = match request.snapshot_offset {
1622                Some(offset) => self
1623                    .state_machine
1624                    .read_snapshot(&request.stream_id, offset)
1625                    .map_err(stream_response_error)?,
1626                None => self
1627                    .state_machine
1628                    .latest_snapshot(&request.stream_id)
1629                    .map_err(stream_response_error)?
1630                    .ok_or_else(|| {
1631                        GroupEngineError::stream(
1632                            StreamErrorCode::SnapshotNotFound,
1633                            format!("stream '{}' has no visible snapshot", request.stream_id),
1634                        )
1635                    })?,
1636            };
1637            let tail_offset = self
1638                .state_machine
1639                .head_at(&request.stream_id, request.now_ms)
1640                .map(|metadata| metadata.tail_offset)
1641                .unwrap_or(snapshot.offset);
1642            Ok(ReadSnapshotResponse {
1643                placement,
1644                snapshot_offset: snapshot.offset,
1645                next_offset: snapshot.offset,
1646                content_type: snapshot.content_type,
1647                payload: snapshot.payload,
1648                up_to_date: snapshot.offset == tail_offset,
1649            })
1650        })
1651    }
1652
1653    fn delete_snapshot<'a>(
1654        &'a mut self,
1655        request: DeleteSnapshotRequest,
1656        placement: ShardPlacement,
1657    ) -> GroupDeleteSnapshotFuture<'a> {
1658        Box::pin(async move {
1659            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1660            match self
1661                .state_machine
1662                .delete_snapshot(&request.stream_id, request.snapshot_offset)
1663            {
1664                StreamResponse::Error {
1665                    code,
1666                    message,
1667                    next_offset,
1668                } => Err(GroupEngineError::stream_with_next_offset(
1669                    code,
1670                    message,
1671                    next_offset,
1672                )),
1673                other => Err(GroupEngineError::new(format!(
1674                    "unexpected delete snapshot response: {other:?}"
1675                ))),
1676            }
1677        })
1678    }
1679
1680    fn bootstrap_stream<'a>(
1681        &'a mut self,
1682        request: BootstrapStreamRequest,
1683        placement: ShardPlacement,
1684    ) -> GroupBootstrapStreamFuture<'a> {
1685        Box::pin(async move {
1686            self.ensure_stream_access(&request.stream_id, request.now_ms, true, placement)?;
1687            let plan = self
1688                .state_machine
1689                .bootstrap_plan(&request.stream_id)
1690                .map_err(stream_response_error)?;
1691            let snapshot_offset = plan.snapshot.as_ref().map(|snapshot| snapshot.offset);
1692            let snapshot_content_type = plan
1693                .snapshot
1694                .as_ref()
1695                .map(|snapshot| snapshot.content_type.clone())
1696                .unwrap_or_else(|| DEFAULT_CONTENT_TYPE.to_owned());
1697            let snapshot_payload = plan
1698                .snapshot
1699                .as_ref()
1700                .map(|snapshot| snapshot.payload.clone())
1701                .unwrap_or_default();
1702            let updates = self
1703                .bootstrap_updates(
1704                    &request.stream_id,
1705                    &plan.updates,
1706                    &plan.content_type,
1707                    request.now_ms,
1708                )
1709                .await?;
1710            Ok(BootstrapStreamResponse {
1711                placement,
1712                snapshot_offset,
1713                snapshot_content_type,
1714                snapshot_payload,
1715                updates,
1716                next_offset: plan.next_offset,
1717                up_to_date: plan.up_to_date,
1718                closed: plan.closed,
1719            })
1720        })
1721    }
1722
1723    fn touch_stream_access<'a>(
1724        &'a mut self,
1725        stream_id: BucketStreamId,
1726        now_ms: u64,
1727        renew_ttl: bool,
1728        placement: ShardPlacement,
1729    ) -> GroupTouchStreamAccessFuture<'a> {
1730        Box::pin(async move { self.apply_access_command(stream_id, now_ms, renew_ttl, placement) })
1731    }
1732
1733    fn add_fork_ref<'a>(
1734        &'a mut self,
1735        stream_id: BucketStreamId,
1736        now_ms: u64,
1737        placement: ShardPlacement,
1738    ) -> GroupForkRefFuture<'a> {
1739        Box::pin(async move {
1740            match self.apply_committed_write(
1741                GroupWriteCommand::AddForkRef { stream_id, now_ms },
1742                placement,
1743            )? {
1744                GroupWriteResponse::AddForkRef(response) => Ok(response),
1745                other => Err(GroupEngineError::new(format!(
1746                    "unexpected add fork ref write response: {other:?}"
1747                ))),
1748            }
1749        })
1750    }
1751
1752    fn release_fork_ref<'a>(
1753        &'a mut self,
1754        stream_id: BucketStreamId,
1755        placement: ShardPlacement,
1756    ) -> GroupForkRefFuture<'a> {
1757        Box::pin(async move {
1758            match self
1759                .apply_committed_write(GroupWriteCommand::ReleaseForkRef { stream_id }, placement)?
1760            {
1761                GroupWriteResponse::ReleaseForkRef(response) => Ok(response),
1762                other => Err(GroupEngineError::new(format!(
1763                    "unexpected release fork ref write response: {other:?}"
1764                ))),
1765            }
1766        })
1767    }
1768
1769    fn head_stream<'a>(
1770        &'a mut self,
1771        request: HeadStreamRequest,
1772        placement: ShardPlacement,
1773    ) -> GroupHeadStreamFuture<'a> {
1774        Box::pin(async move {
1775            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1776            self.head_stream_after_access(&request, placement)
1777        })
1778    }
1779
1780    fn close_stream<'a>(
1781        &'a mut self,
1782        request: CloseStreamRequest,
1783        placement: ShardPlacement,
1784    ) -> GroupCloseStreamFuture<'a> {
1785        Box::pin(async move {
1786            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1787            let command = GroupWriteCommand::from(request);
1788            match self.apply_committed_write(command, placement)? {
1789                GroupWriteResponse::CloseStream(response) => Ok(response),
1790                other => Err(GroupEngineError::new(format!(
1791                    "unexpected close stream write response: {other:?}"
1792                ))),
1793            }
1794        })
1795    }
1796
1797    fn delete_stream<'a>(
1798        &'a mut self,
1799        request: DeleteStreamRequest,
1800        placement: ShardPlacement,
1801    ) -> GroupDeleteStreamFuture<'a> {
1802        let command = GroupWriteCommand::from(request);
1803        Box::pin(async move {
1804            match self.apply_committed_write(command, placement)? {
1805                GroupWriteResponse::DeleteStream(response) => Ok(response),
1806                other => Err(GroupEngineError::new(format!(
1807                    "unexpected delete stream write response: {other:?}"
1808                ))),
1809            }
1810        })
1811    }
1812
1813    fn append<'a>(
1814        &'a mut self,
1815        request: AppendRequest,
1816        placement: ShardPlacement,
1817    ) -> GroupAppendFuture<'a> {
1818        Box::pin(async move {
1819            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1820            let command = GroupWriteCommand::from(request);
1821            match self.apply_committed_write(command, placement)? {
1822                GroupWriteResponse::Append(response) => Ok(response),
1823                other => Err(GroupEngineError::new(format!(
1824                    "unexpected append write response: {other:?}"
1825                ))),
1826            }
1827        })
1828    }
1829
1830    fn append_with_cold_admission<'a>(
1831        &'a mut self,
1832        request: AppendRequest,
1833        placement: ShardPlacement,
1834        admission: ColdWriteAdmission,
1835    ) -> GroupAppendFuture<'a> {
1836        if !admission.is_enabled() {
1837            return self.append(request, placement);
1838        }
1839        Box::pin(async move { self.append_with_admission_inner(request, placement, admission) })
1840    }
1841
1842    fn append_external<'a>(
1843        &'a mut self,
1844        request: AppendExternalRequest,
1845        placement: ShardPlacement,
1846    ) -> GroupAppendFuture<'a> {
1847        Box::pin(async move {
1848            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1849            let command = GroupWriteCommand::from(request);
1850            match self.apply_committed_write(command, placement)? {
1851                GroupWriteResponse::Append(response) => Ok(response),
1852                other => Err(GroupEngineError::new(format!(
1853                    "unexpected external append write response: {other:?}"
1854                ))),
1855            }
1856        })
1857    }
1858
1859    fn append_batch<'a>(
1860        &'a mut self,
1861        request: AppendBatchRequest,
1862        placement: ShardPlacement,
1863    ) -> GroupAppendBatchFuture<'a> {
1864        Box::pin(async move {
1865            self.ensure_stream_access(&request.stream_id, request.now_ms, false, placement)?;
1866            let command = GroupWriteCommand::from(request);
1867            match self.apply_committed_write(command, placement)? {
1868                GroupWriteResponse::AppendBatch(response) => Ok(response),
1869                other => Err(GroupEngineError::new(format!(
1870                    "unexpected append batch write response: {other:?}"
1871                ))),
1872            }
1873        })
1874    }
1875
1876    fn append_batch_with_cold_admission<'a>(
1877        &'a mut self,
1878        request: AppendBatchRequest,
1879        placement: ShardPlacement,
1880        admission: ColdWriteAdmission,
1881    ) -> GroupAppendBatchFuture<'a> {
1882        if !admission.is_enabled() {
1883            return self.append_batch(request, placement);
1884        }
1885        Box::pin(
1886            async move { self.append_batch_with_admission_inner(request, placement, admission) },
1887        )
1888    }
1889
1890    fn flush_cold<'a>(
1891        &'a mut self,
1892        request: FlushColdRequest,
1893        placement: ShardPlacement,
1894    ) -> GroupFlushColdFuture<'a> {
1895        let command = GroupWriteCommand::from(request);
1896        Box::pin(async move {
1897            match self.apply_committed_write(command, placement)? {
1898                GroupWriteResponse::FlushCold(response) => Ok(response),
1899                other => Err(GroupEngineError::new(format!(
1900                    "unexpected flush cold write response: {other:?}"
1901                ))),
1902            }
1903        })
1904    }
1905
1906    fn plan_cold_flush<'a>(
1907        &'a mut self,
1908        request: PlanColdFlushRequest,
1909        _placement: ShardPlacement,
1910    ) -> GroupPlanColdFlushFuture<'a> {
1911        Box::pin(async move {
1912            self.state_machine
1913                .plan_cold_flush(
1914                    &request.stream_id,
1915                    request.min_hot_bytes,
1916                    request.max_flush_bytes,
1917                )
1918                .map_err(stream_response_error)
1919        })
1920    }
1921
1922    fn plan_next_cold_flush<'a>(
1923        &'a mut self,
1924        request: PlanGroupColdFlushRequest,
1925        _placement: ShardPlacement,
1926    ) -> GroupPlanNextColdFlushFuture<'a> {
1927        Box::pin(async move {
1928            self.state_machine
1929                .plan_next_cold_flush(request.min_hot_bytes, request.max_flush_bytes)
1930                .map_err(stream_response_error)
1931        })
1932    }
1933
1934    fn plan_next_cold_flush_batch<'a>(
1935        &'a mut self,
1936        request: PlanGroupColdFlushRequest,
1937        _placement: ShardPlacement,
1938        max_candidates: usize,
1939    ) -> GroupPlanNextColdFlushBatchFuture<'a> {
1940        Box::pin(async move {
1941            self.state_machine
1942                .plan_next_cold_flush_batch(
1943                    request.min_hot_bytes,
1944                    request.max_flush_bytes,
1945                    max_candidates,
1946                )
1947                .map_err(stream_response_error)
1948        })
1949    }
1950
1951    fn cold_hot_backlog<'a>(
1952        &'a mut self,
1953        stream_id: BucketStreamId,
1954        _placement: ShardPlacement,
1955    ) -> GroupColdHotBacklogFuture<'a> {
1956        Box::pin(async move { self.cold_hot_backlog_for(stream_id) })
1957    }
1958
1959    fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a> {
1960        Box::pin(async move { Ok(self.build_snapshot(placement)) })
1961    }
1962
1963    fn install_snapshot<'a>(
1964        &'a mut self,
1965        snapshot: GroupSnapshot,
1966    ) -> GroupInstallSnapshotFuture<'a> {
1967        Box::pin(async move { self.install_snapshot_inner(snapshot) })
1968    }
1969}
1970
1971#[derive(Debug, Clone, Default)]
1972pub struct InMemoryGroupEngineFactory {
1973    cold_store: Option<ColdStoreHandle>,
1974}
1975
1976impl InMemoryGroupEngineFactory {
1977    pub fn new() -> Self {
1978        Self::default()
1979    }
1980
1981    pub fn with_cold_store(cold_store: Option<ColdStoreHandle>) -> Self {
1982        Self { cold_store }
1983    }
1984}
1985
1986impl GroupEngineFactory for InMemoryGroupEngineFactory {
1987    fn create<'a>(
1988        &'a self,
1989        _placement: ShardPlacement,
1990        _metrics: GroupEngineMetrics,
1991    ) -> GroupEngineCreateFuture<'a> {
1992        Box::pin(async move {
1993            let engine = InMemoryGroupEngine {
1994                cold_store: self.cold_store.clone(),
1995                ..InMemoryGroupEngine::default()
1996            };
1997            let engine: Box<dyn GroupEngine> = Box::new(engine);
1998            Ok(engine)
1999        })
2000    }
2001}
2002
2003pub(crate) fn compare_stream_ids(
2004    left: &BucketStreamId,
2005    right: &BucketStreamId,
2006) -> std::cmp::Ordering {
2007    left.bucket_id
2008        .cmp(&right.bucket_id)
2009        .then_with(|| left.stream_id.cmp(&right.stream_id))
2010}
2011pub(crate) fn ensure_bucket_exists(
2012    state_machine: &mut StreamStateMachine,
2013    stream_id: &BucketStreamId,
2014) -> Result<(), GroupEngineError> {
2015    if state_machine.bucket_exists(&stream_id.bucket_id) {
2016        return Ok(());
2017    }
2018
2019    match state_machine.apply(StreamCommand::CreateBucket {
2020        bucket_id: stream_id.bucket_id.clone(),
2021    }) {
2022        StreamResponse::BucketCreated { .. } | StreamResponse::BucketAlreadyExists { .. } => Ok(()),
2023        StreamResponse::Error {
2024            code,
2025            message,
2026            next_offset,
2027        } => Err(GroupEngineError::stream_with_next_offset(
2028            code,
2029            message,
2030            next_offset,
2031        )),
2032        other => Err(GroupEngineError::new(format!(
2033            "unexpected create bucket response: {other:?}"
2034        ))),
2035    }
2036}
2037
2038pub(crate) fn stream_response_error(response: StreamResponse) -> GroupEngineError {
2039    match response {
2040        StreamResponse::Error {
2041            code,
2042            message,
2043            next_offset,
2044        } => GroupEngineError::stream_with_next_offset(code, message, next_offset),
2045        other => GroupEngineError::new(format!("unexpected stream response error: {other:?}")),
2046    }
2047}
2048
2049pub(crate) fn restore_stream_append_counts(
2050    counts: Vec<StreamAppendCount>,
2051    snapshot_stream_ids: &HashSet<BucketStreamId>,
2052) -> Result<HashMap<BucketStreamId, u64>, GroupEngineError> {
2053    let mut restored = HashMap::with_capacity(counts.len());
2054    for count in counts {
2055        if !snapshot_stream_ids.contains(&count.stream_id) {
2056            return Err(GroupEngineError::new(format!(
2057                "append count references missing snapshot stream '{}'",
2058                count.stream_id
2059            )));
2060        }
2061        if restored
2062            .insert(count.stream_id.clone(), count.append_count)
2063            .is_some()
2064        {
2065            return Err(GroupEngineError::new(format!(
2066                "snapshot contains duplicate append count for stream '{}'",
2067                count.stream_id
2068            )));
2069        }
2070    }
2071    Ok(restored)
2072}