Skip to main content

ursula_runtime/
engine.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use ursula_shard::{BucketStreamId, ShardPlacement};
7use ursula_stream::{ColdFlushCandidate, StreamErrorCode};
8
9use crate::command::{GroupSnapshot, GroupWriteCommand};
10use crate::metrics::{RaftWriteManySample, RuntimeMetricsInner};
11use crate::request::{
12    AppendBatchRequest, AppendExternalRequest, AppendRequest, AppendResponse,
13    BootstrapStreamRequest, BootstrapStreamResponse, CloseStreamRequest, CloseStreamResponse,
14    ColdHotBacklog, ColdWriteAdmission, CreateStreamExternalRequest, CreateStreamRequest,
15    CreateStreamResponse, DeleteSnapshotRequest, DeleteStreamRequest, DeleteStreamResponse,
16    FlushColdRequest, FlushColdResponse, ForkRefResponse, GroupReadStreamParts, HeadStreamRequest,
17    HeadStreamResponse, PlanColdFlushRequest, PlanGroupColdFlushRequest, PublishSnapshotRequest,
18    PublishSnapshotResponse, ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest,
19    ReadStreamResponse, TouchStreamAccessResponse,
20};
21
22pub type GroupAppendFuture<'a> =
23    Pin<Box<dyn Future<Output = Result<AppendResponse, GroupEngineError>> + Send + 'a>>;
24pub type GroupAppendBatchFuture<'a> =
25    Pin<Box<dyn Future<Output = Result<GroupAppendBatchResponse, GroupEngineError>> + Send + 'a>>;
26pub type GroupFlushColdFuture<'a> =
27    Pin<Box<dyn Future<Output = Result<FlushColdResponse, GroupEngineError>> + Send + 'a>>;
28pub type GroupPlanColdFlushFuture<'a> =
29    Pin<Box<dyn Future<Output = Result<Option<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
30pub type GroupPlanNextColdFlushFuture<'a> =
31    Pin<Box<dyn Future<Output = Result<Option<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
32pub type GroupPlanNextColdFlushBatchFuture<'a> =
33    Pin<Box<dyn Future<Output = Result<Vec<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
34pub type GroupColdHotBacklogFuture<'a> =
35    Pin<Box<dyn Future<Output = Result<ColdHotBacklog, GroupEngineError>> + Send + 'a>>;
36pub type GroupCreateStreamFuture<'a> =
37    Pin<Box<dyn Future<Output = Result<CreateStreamResponse, GroupEngineError>> + Send + 'a>>;
38pub type GroupHeadStreamFuture<'a> =
39    Pin<Box<dyn Future<Output = Result<HeadStreamResponse, GroupEngineError>> + Send + 'a>>;
40pub type GroupReadStreamFuture<'a> =
41    Pin<Box<dyn Future<Output = Result<ReadStreamResponse, GroupEngineError>> + Send + 'a>>;
42pub type GroupReadStreamPartsFuture<'a> =
43    Pin<Box<dyn Future<Output = Result<GroupReadStreamParts, GroupEngineError>> + Send + 'a>>;
44pub type GroupRequireLiveReadOwnerFuture<'a> =
45    Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
46pub type GroupPublishSnapshotFuture<'a> =
47    Pin<Box<dyn Future<Output = Result<PublishSnapshotResponse, GroupEngineError>> + Send + 'a>>;
48pub type GroupReadSnapshotFuture<'a> =
49    Pin<Box<dyn Future<Output = Result<ReadSnapshotResponse, GroupEngineError>> + Send + 'a>>;
50pub type GroupDeleteSnapshotFuture<'a> =
51    Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
52pub type GroupBootstrapStreamFuture<'a> =
53    Pin<Box<dyn Future<Output = Result<BootstrapStreamResponse, GroupEngineError>> + Send + 'a>>;
54pub type GroupTouchStreamAccessFuture<'a> =
55    Pin<Box<dyn Future<Output = Result<TouchStreamAccessResponse, GroupEngineError>> + Send + 'a>>;
56pub type GroupCloseStreamFuture<'a> =
57    Pin<Box<dyn Future<Output = Result<CloseStreamResponse, GroupEngineError>> + Send + 'a>>;
58pub type GroupDeleteStreamFuture<'a> =
59    Pin<Box<dyn Future<Output = Result<DeleteStreamResponse, GroupEngineError>> + Send + 'a>>;
60pub type GroupForkRefFuture<'a> =
61    Pin<Box<dyn Future<Output = Result<ForkRefResponse, GroupEngineError>> + Send + 'a>>;
62pub type GroupSnapshotFuture<'a> =
63    Pin<Box<dyn Future<Output = Result<GroupSnapshot, GroupEngineError>> + Send + 'a>>;
64pub type GroupInstallSnapshotFuture<'a> =
65    Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
66pub type GroupWriteBatchFuture<'a> = Pin<
67    Box<
68        dyn Future<
69                Output = Result<
70                    Vec<Result<GroupWriteResponse, GroupEngineError>>,
71                    GroupEngineError,
72                >,
73            > + Send
74            + 'a,
75    >,
76>;
77pub type GroupEngineCreateFuture<'a> =
78    Pin<Box<dyn Future<Output = Result<Box<dyn GroupEngine>, GroupEngineError>> + Send + 'a>>;
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct GroupAppendBatchResponse {
82    pub placement: ShardPlacement,
83    pub items: Vec<Result<AppendResponse, GroupEngineError>>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87pub enum GroupWriteResponse {
88    CreateStream(CreateStreamResponse),
89    Append(AppendResponse),
90    AppendBatch(GroupAppendBatchResponse),
91    PublishSnapshot(PublishSnapshotResponse),
92    TouchStreamAccess(TouchStreamAccessResponse),
93    AddForkRef(ForkRefResponse),
94    ReleaseForkRef(ForkRefResponse),
95    FlushCold(FlushColdResponse),
96    CloseStream(CloseStreamResponse),
97    DeleteStream(DeleteStreamResponse),
98    Batch(Vec<Result<GroupWriteResponse, GroupEngineError>>),
99}
100
101pub trait GroupEngine: Send + 'static {
102    fn accepts_local_writes(&self) -> bool {
103        true
104    }
105
106    fn create_stream<'a>(
107        &'a mut self,
108        request: CreateStreamRequest,
109        placement: ShardPlacement,
110    ) -> GroupCreateStreamFuture<'a>;
111
112    fn create_stream_external<'a>(
113        &'a mut self,
114        request: CreateStreamExternalRequest,
115        _placement: ShardPlacement,
116    ) -> GroupCreateStreamFuture<'a> {
117        Box::pin(async move {
118            Err(GroupEngineError::new(format!(
119                "external stream create is not supported for stream '{}'",
120                request.stream_id
121            )))
122        })
123    }
124
125    fn head_stream<'a>(
126        &'a mut self,
127        request: HeadStreamRequest,
128        placement: ShardPlacement,
129    ) -> GroupHeadStreamFuture<'a>;
130
131    fn read_stream<'a>(
132        &'a mut self,
133        request: ReadStreamRequest,
134        placement: ShardPlacement,
135    ) -> GroupReadStreamFuture<'a>;
136
137    fn read_stream_parts<'a>(
138        &'a mut self,
139        request: ReadStreamRequest,
140        placement: ShardPlacement,
141    ) -> GroupReadStreamPartsFuture<'a> {
142        Box::pin(async move {
143            let response = self.read_stream(request, placement).await?;
144            Ok(GroupReadStreamParts::from_response(response))
145        })
146    }
147
148    fn require_local_live_read_owner<'a>(
149        &'a mut self,
150        _placement: ShardPlacement,
151    ) -> GroupRequireLiveReadOwnerFuture<'a> {
152        Box::pin(async { Ok(()) })
153    }
154
155    fn publish_snapshot<'a>(
156        &'a mut self,
157        request: PublishSnapshotRequest,
158        _placement: ShardPlacement,
159    ) -> GroupPublishSnapshotFuture<'a> {
160        Box::pin(async move {
161            Err(GroupEngineError::new(format!(
162                "snapshot publish is not supported for stream '{}'",
163                request.stream_id
164            )))
165        })
166    }
167
168    fn read_snapshot<'a>(
169        &'a mut self,
170        request: ReadSnapshotRequest,
171        _placement: ShardPlacement,
172    ) -> GroupReadSnapshotFuture<'a> {
173        Box::pin(async move {
174            Err(GroupEngineError::new(format!(
175                "snapshot read is not supported for stream '{}'",
176                request.stream_id
177            )))
178        })
179    }
180
181    fn delete_snapshot<'a>(
182        &'a mut self,
183        request: DeleteSnapshotRequest,
184        _placement: ShardPlacement,
185    ) -> GroupDeleteSnapshotFuture<'a> {
186        Box::pin(async move {
187            Err(GroupEngineError::new(format!(
188                "snapshot delete is not supported for stream '{}'",
189                request.stream_id
190            )))
191        })
192    }
193
194    fn bootstrap_stream<'a>(
195        &'a mut self,
196        request: BootstrapStreamRequest,
197        _placement: ShardPlacement,
198    ) -> GroupBootstrapStreamFuture<'a> {
199        Box::pin(async move {
200            Err(GroupEngineError::new(format!(
201                "bootstrap is not supported for stream '{}'",
202                request.stream_id
203            )))
204        })
205    }
206
207    fn touch_stream_access<'a>(
208        &'a mut self,
209        stream_id: BucketStreamId,
210        now_ms: u64,
211        renew_ttl: bool,
212        placement: ShardPlacement,
213    ) -> GroupTouchStreamAccessFuture<'a>;
214
215    fn add_fork_ref<'a>(
216        &'a mut self,
217        stream_id: BucketStreamId,
218        now_ms: u64,
219        placement: ShardPlacement,
220    ) -> GroupForkRefFuture<'a>;
221
222    fn release_fork_ref<'a>(
223        &'a mut self,
224        stream_id: BucketStreamId,
225        placement: ShardPlacement,
226    ) -> GroupForkRefFuture<'a>;
227
228    fn close_stream<'a>(
229        &'a mut self,
230        request: CloseStreamRequest,
231        placement: ShardPlacement,
232    ) -> GroupCloseStreamFuture<'a>;
233
234    fn delete_stream<'a>(
235        &'a mut self,
236        request: DeleteStreamRequest,
237        placement: ShardPlacement,
238    ) -> GroupDeleteStreamFuture<'a>;
239
240    fn append<'a>(
241        &'a mut self,
242        request: AppendRequest,
243        placement: ShardPlacement,
244    ) -> GroupAppendFuture<'a>;
245
246    fn append_external<'a>(
247        &'a mut self,
248        request: AppendExternalRequest,
249        _placement: ShardPlacement,
250    ) -> GroupAppendFuture<'a> {
251        Box::pin(async move {
252            Err(GroupEngineError::new(format!(
253                "external append is not supported for stream '{}'",
254                request.stream_id
255            )))
256        })
257    }
258
259    fn append_batch<'a>(
260        &'a mut self,
261        request: AppendBatchRequest,
262        placement: ShardPlacement,
263    ) -> GroupAppendBatchFuture<'a>;
264
265    fn create_stream_with_cold_admission<'a>(
266        &'a mut self,
267        request: CreateStreamRequest,
268        placement: ShardPlacement,
269        _admission: ColdWriteAdmission,
270    ) -> GroupCreateStreamFuture<'a> {
271        self.create_stream(request, placement)
272    }
273
274    fn append_with_cold_admission<'a>(
275        &'a mut self,
276        request: AppendRequest,
277        placement: ShardPlacement,
278        _admission: ColdWriteAdmission,
279    ) -> GroupAppendFuture<'a> {
280        self.append(request, placement)
281    }
282
283    fn append_batch_with_cold_admission<'a>(
284        &'a mut self,
285        request: AppendBatchRequest,
286        placement: ShardPlacement,
287        _admission: ColdWriteAdmission,
288    ) -> GroupAppendBatchFuture<'a> {
289        self.append_batch(request, placement)
290    }
291
292    fn append_batch_many_with_cold_admission<'a>(
293        &'a mut self,
294        requests: Vec<AppendBatchRequest>,
295        placement: ShardPlacement,
296        admission: ColdWriteAdmission,
297    ) -> GroupWriteBatchFuture<'a> {
298        Box::pin(async move {
299            let mut responses = Vec::with_capacity(requests.len());
300            for request in requests {
301                let response = self
302                    .append_batch_with_cold_admission(request, placement, admission)
303                    .await
304                    .map(GroupWriteResponse::AppendBatch);
305                responses.push(response);
306            }
307            Ok(responses)
308        })
309    }
310
311    fn flush_cold<'a>(
312        &'a mut self,
313        request: FlushColdRequest,
314        _placement: ShardPlacement,
315    ) -> GroupFlushColdFuture<'a> {
316        Box::pin(async move {
317            Err(GroupEngineError::new(format!(
318                "cold flush is not supported for stream '{}'",
319                request.stream_id
320            )))
321        })
322    }
323
324    fn plan_cold_flush<'a>(
325        &'a mut self,
326        request: PlanColdFlushRequest,
327        _placement: ShardPlacement,
328    ) -> GroupPlanColdFlushFuture<'a> {
329        Box::pin(async move {
330            Err(GroupEngineError::new(format!(
331                "cold flush planning is not supported for stream '{}'",
332                request.stream_id
333            )))
334        })
335    }
336
337    fn plan_next_cold_flush<'a>(
338        &'a mut self,
339        _request: PlanGroupColdFlushRequest,
340        _placement: ShardPlacement,
341    ) -> GroupPlanNextColdFlushFuture<'a> {
342        Box::pin(async move {
343            Err(GroupEngineError::new(
344                "group cold flush planning is not supported",
345            ))
346        })
347    }
348
349    fn plan_next_cold_flush_batch<'a>(
350        &'a mut self,
351        request: PlanGroupColdFlushRequest,
352        placement: ShardPlacement,
353        max_candidates: usize,
354    ) -> GroupPlanNextColdFlushBatchFuture<'a> {
355        Box::pin(async move {
356            match self.plan_next_cold_flush(request, placement).await? {
357                Some(candidate) if max_candidates > 0 => Ok(vec![candidate]),
358                _ => Ok(Vec::new()),
359            }
360        })
361    }
362
363    fn cold_hot_backlog<'a>(
364        &'a mut self,
365        stream_id: BucketStreamId,
366        _placement: ShardPlacement,
367    ) -> GroupColdHotBacklogFuture<'a> {
368        Box::pin(async move {
369            Err(GroupEngineError::new(format!(
370                "cold hot backlog is not supported for stream '{stream_id}'"
371            )))
372        })
373    }
374
375    fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a>;
376
377    fn install_snapshot<'a>(
378        &'a mut self,
379        snapshot: GroupSnapshot,
380    ) -> GroupInstallSnapshotFuture<'a>;
381
382    fn write_batch<'a>(
383        &'a mut self,
384        commands: Vec<GroupWriteCommand>,
385        placement: ShardPlacement,
386    ) -> GroupWriteBatchFuture<'a> {
387        Box::pin(async move {
388            let mut responses = Vec::with_capacity(commands.len());
389            for command in commands {
390                let response = match command {
391                    GroupWriteCommand::CreateStream {
392                        stream_id,
393                        content_type,
394                        initial_payload,
395                        close_after,
396                        stream_seq,
397                        producer,
398                        stream_ttl_seconds,
399                        stream_expires_at_ms,
400                        forked_from,
401                        fork_offset,
402                        now_ms,
403                    } => self
404                        .create_stream(
405                            CreateStreamRequest {
406                                stream_id,
407                                content_type,
408                                content_type_explicit: true,
409                                initial_payload,
410                                close_after,
411                                stream_seq,
412                                producer,
413                                stream_ttl_seconds,
414                                stream_expires_at_ms,
415                                forked_from,
416                                fork_offset,
417                                now_ms,
418                            },
419                            placement,
420                        )
421                        .await
422                        .map(GroupWriteResponse::CreateStream),
423                    GroupWriteCommand::CreateExternal {
424                        stream_id,
425                        content_type,
426                        initial_payload,
427                        close_after,
428                        stream_seq,
429                        producer,
430                        stream_ttl_seconds,
431                        stream_expires_at_ms,
432                        forked_from,
433                        fork_offset,
434                        now_ms,
435                    } => self
436                        .create_stream_external(
437                            CreateStreamExternalRequest {
438                                stream_id,
439                                content_type,
440                                initial_payload,
441                                close_after,
442                                stream_seq,
443                                producer,
444                                stream_ttl_seconds,
445                                stream_expires_at_ms,
446                                forked_from,
447                                fork_offset,
448                                now_ms,
449                            },
450                            placement,
451                        )
452                        .await
453                        .map(GroupWriteResponse::CreateStream),
454                    GroupWriteCommand::Append {
455                        stream_id,
456                        content_type,
457                        payload,
458                        close_after,
459                        stream_seq,
460                        producer,
461                        now_ms,
462                    } => self
463                        .append(
464                            AppendRequest {
465                                stream_id,
466                                content_type,
467                                payload,
468                                close_after,
469                                stream_seq,
470                                producer,
471                                now_ms,
472                            },
473                            placement,
474                        )
475                        .await
476                        .map(GroupWriteResponse::Append),
477                    GroupWriteCommand::AppendExternal {
478                        stream_id,
479                        content_type,
480                        payload,
481                        close_after,
482                        stream_seq,
483                        producer,
484                        now_ms,
485                    } => self
486                        .append_external(
487                            AppendExternalRequest {
488                                stream_id,
489                                content_type,
490                                payload,
491                                close_after,
492                                stream_seq,
493                                producer,
494                                now_ms,
495                            },
496                            placement,
497                        )
498                        .await
499                        .map(GroupWriteResponse::Append),
500                    GroupWriteCommand::AppendBatch {
501                        stream_id,
502                        content_type,
503                        payloads,
504                        producer,
505                        now_ms,
506                    } => self
507                        .append_batch(
508                            AppendBatchRequest {
509                                stream_id,
510                                content_type,
511                                payloads,
512                                producer,
513                                now_ms,
514                            },
515                            placement,
516                        )
517                        .await
518                        .map(GroupWriteResponse::AppendBatch),
519                    GroupWriteCommand::PublishSnapshot {
520                        stream_id,
521                        snapshot_offset,
522                        content_type,
523                        payload,
524                        now_ms,
525                    } => self
526                        .publish_snapshot(
527                            PublishSnapshotRequest {
528                                stream_id,
529                                snapshot_offset,
530                                content_type,
531                                payload,
532                                now_ms,
533                            },
534                            placement,
535                        )
536                        .await
537                        .map(GroupWriteResponse::PublishSnapshot),
538                    GroupWriteCommand::TouchStreamAccess {
539                        stream_id,
540                        now_ms,
541                        renew_ttl,
542                    } => self
543                        .touch_stream_access(stream_id, now_ms, renew_ttl, placement)
544                        .await
545                        .map(GroupWriteResponse::TouchStreamAccess),
546                    GroupWriteCommand::AddForkRef { stream_id, now_ms } => self
547                        .add_fork_ref(stream_id, now_ms, placement)
548                        .await
549                        .map(GroupWriteResponse::AddForkRef),
550                    GroupWriteCommand::ReleaseForkRef { stream_id } => self
551                        .release_fork_ref(stream_id, placement)
552                        .await
553                        .map(GroupWriteResponse::ReleaseForkRef),
554                    GroupWriteCommand::FlushCold { stream_id, chunk } => self
555                        .flush_cold(FlushColdRequest { stream_id, chunk }, placement)
556                        .await
557                        .map(GroupWriteResponse::FlushCold),
558                    GroupWriteCommand::CloseStream {
559                        stream_id,
560                        stream_seq,
561                        producer,
562                        now_ms,
563                    } => self
564                        .close_stream(
565                            CloseStreamRequest {
566                                stream_id,
567                                stream_seq,
568                                producer,
569                                now_ms,
570                            },
571                            placement,
572                        )
573                        .await
574                        .map(GroupWriteResponse::CloseStream),
575                    GroupWriteCommand::DeleteStream { stream_id } => self
576                        .delete_stream(DeleteStreamRequest { stream_id }, placement)
577                        .await
578                        .map(GroupWriteResponse::DeleteStream),
579                    GroupWriteCommand::Batch { commands } => self
580                        .write_batch(commands, placement)
581                        .await
582                        .map(GroupWriteResponse::Batch),
583                };
584                responses.push(response);
585            }
586            Ok(responses)
587        })
588    }
589}
590
591pub trait GroupEngineFactory: Send + Sync + 'static {
592    fn create<'a>(
593        &'a self,
594        placement: ShardPlacement,
595        metrics: GroupEngineMetrics,
596    ) -> GroupEngineCreateFuture<'a>;
597}
598
599#[derive(Debug, Clone)]
600pub struct GroupEngineMetrics {
601    pub(crate) inner: Arc<RuntimeMetricsInner>,
602}
603
604impl GroupEngineMetrics {
605    pub fn record_wal_batch(
606        &self,
607        placement: ShardPlacement,
608        record_count: usize,
609        write_ns: u64,
610        sync_ns: u64,
611    ) {
612        self.inner.record_wal_batch(
613            placement.core_id,
614            placement.raft_group_id,
615            u64::try_from(record_count).expect("record count fits u64"),
616            write_ns,
617            sync_ns,
618        );
619    }
620
621    pub fn record_raft_write_many(
622        &self,
623        placement: ShardPlacement,
624        command_count: usize,
625        logical_command_count: usize,
626        response_count: usize,
627        submit_ns: u64,
628        response_ns: u64,
629    ) {
630        self.inner.record_raft_write_many(
631            placement.core_id,
632            placement.raft_group_id,
633            RaftWriteManySample {
634                command_count: u64::try_from(command_count).expect("command count fits u64"),
635                logical_command_count: u64::try_from(logical_command_count)
636                    .expect("logical command count fits u64"),
637                response_count: u64::try_from(response_count).expect("response count fits u64"),
638                submit_ns,
639                response_ns,
640            },
641        );
642    }
643
644    pub fn record_raft_apply_batch(
645        &self,
646        placement: ShardPlacement,
647        entry_count: usize,
648        apply_ns: u64,
649    ) {
650        self.inner.record_raft_apply_batch(
651            placement.core_id,
652            placement.raft_group_id,
653            u64::try_from(entry_count).expect("entry count fits u64"),
654            apply_ns,
655        );
656    }
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct GroupLeaderHint {
661    pub node_id: Option<u64>,
662    pub address: Option<String>,
663}
664
665#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
666pub struct GroupEngineError {
667    message: String,
668    code: Option<StreamErrorCode>,
669    next_offset: Option<u64>,
670    #[serde(default, skip_serializing_if = "Option::is_none")]
671    leader_hint: Option<GroupLeaderHint>,
672}
673
674impl GroupEngineError {
675    pub fn new(message: impl Into<String>) -> Self {
676        Self {
677            message: message.into(),
678            code: None,
679            next_offset: None,
680            leader_hint: None,
681        }
682    }
683
684    pub fn stream(code: StreamErrorCode, message: impl Into<String>) -> Self {
685        Self::stream_with_next_offset(code, message, None)
686    }
687
688    pub fn stream_with_next_offset(
689        code: StreamErrorCode,
690        message: impl Into<String>,
691        next_offset: Option<u64>,
692    ) -> Self {
693        Self {
694            message: format!("{code:?}: {}", message.into()),
695            code: Some(code),
696            next_offset,
697            leader_hint: None,
698        }
699    }
700
701    pub fn forward_to_leader(
702        message: impl Into<String>,
703        node_id: Option<u64>,
704        address: Option<String>,
705    ) -> Self {
706        Self {
707            message: message.into(),
708            code: None,
709            next_offset: None,
710            leader_hint: Some(GroupLeaderHint { node_id, address }),
711        }
712    }
713
714    pub fn from_replicated_parts(
715        message: impl Into<String>,
716        code: Option<StreamErrorCode>,
717        next_offset: Option<u64>,
718        leader_hint: Option<GroupLeaderHint>,
719    ) -> Self {
720        Self {
721            message: message.into(),
722            code,
723            next_offset,
724            leader_hint,
725        }
726    }
727
728    pub fn message(&self) -> &str {
729        &self.message
730    }
731
732    pub fn code(&self) -> Option<StreamErrorCode> {
733        self.code
734    }
735
736    pub fn next_offset(&self) -> Option<u64> {
737        self.next_offset
738    }
739
740    pub fn leader_hint(&self) -> Option<&GroupLeaderHint> {
741        self.leader_hint.as_ref()
742    }
743}
744
745impl std::fmt::Display for GroupEngineError {
746    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747        f.write_str(&self.message)
748    }
749}
750
751impl std::error::Error for GroupEngineError {}