1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use ursula_shard::{BucketStreamId, ShardPlacement};
7use ursula_stream::{ColdFlushCandidate, StreamErrorCode};
8
9use crate::command::{GroupSnapshot, GroupWriteCommand};
10use crate::metrics::{RaftWriteManySample, RuntimeMetricsInner};
11use crate::request::{
12 AppendBatchRequest, AppendExternalRequest, AppendRequest, AppendResponse,
13 BootstrapStreamRequest, BootstrapStreamResponse, CloseStreamRequest, CloseStreamResponse,
14 ColdHotBacklog, ColdWriteAdmission, CreateStreamExternalRequest, CreateStreamRequest,
15 CreateStreamResponse, DeleteSnapshotRequest, DeleteStreamRequest, DeleteStreamResponse,
16 FlushColdRequest, FlushColdResponse, ForkRefResponse, GroupReadStreamParts, HeadStreamRequest,
17 HeadStreamResponse, PlanColdFlushRequest, PlanGroupColdFlushRequest, PublishSnapshotRequest,
18 PublishSnapshotResponse, ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest,
19 ReadStreamResponse, TouchStreamAccessResponse,
20};
21
22pub type GroupAppendFuture<'a> =
23 Pin<Box<dyn Future<Output = Result<AppendResponse, GroupEngineError>> + Send + 'a>>;
24pub type GroupAppendBatchFuture<'a> =
25 Pin<Box<dyn Future<Output = Result<GroupAppendBatchResponse, GroupEngineError>> + Send + 'a>>;
26pub type GroupFlushColdFuture<'a> =
27 Pin<Box<dyn Future<Output = Result<FlushColdResponse, GroupEngineError>> + Send + 'a>>;
28pub type GroupPlanColdFlushFuture<'a> =
29 Pin<Box<dyn Future<Output = Result<Option<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
30pub type GroupPlanNextColdFlushFuture<'a> =
31 Pin<Box<dyn Future<Output = Result<Option<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
32pub type GroupPlanNextColdFlushBatchFuture<'a> =
33 Pin<Box<dyn Future<Output = Result<Vec<ColdFlushCandidate>, GroupEngineError>> + Send + 'a>>;
34pub type GroupColdHotBacklogFuture<'a> =
35 Pin<Box<dyn Future<Output = Result<ColdHotBacklog, GroupEngineError>> + Send + 'a>>;
36pub type GroupCreateStreamFuture<'a> =
37 Pin<Box<dyn Future<Output = Result<CreateStreamResponse, GroupEngineError>> + Send + 'a>>;
38pub type GroupHeadStreamFuture<'a> =
39 Pin<Box<dyn Future<Output = Result<HeadStreamResponse, GroupEngineError>> + Send + 'a>>;
40pub type GroupReadStreamFuture<'a> =
41 Pin<Box<dyn Future<Output = Result<ReadStreamResponse, GroupEngineError>> + Send + 'a>>;
42pub type GroupReadStreamPartsFuture<'a> =
43 Pin<Box<dyn Future<Output = Result<GroupReadStreamParts, GroupEngineError>> + Send + 'a>>;
44pub type GroupRequireLiveReadOwnerFuture<'a> =
45 Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
46pub type GroupPublishSnapshotFuture<'a> =
47 Pin<Box<dyn Future<Output = Result<PublishSnapshotResponse, GroupEngineError>> + Send + 'a>>;
48pub type GroupReadSnapshotFuture<'a> =
49 Pin<Box<dyn Future<Output = Result<ReadSnapshotResponse, GroupEngineError>> + Send + 'a>>;
50pub type GroupDeleteSnapshotFuture<'a> =
51 Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
52pub type GroupBootstrapStreamFuture<'a> =
53 Pin<Box<dyn Future<Output = Result<BootstrapStreamResponse, GroupEngineError>> + Send + 'a>>;
54pub type GroupTouchStreamAccessFuture<'a> =
55 Pin<Box<dyn Future<Output = Result<TouchStreamAccessResponse, GroupEngineError>> + Send + 'a>>;
56pub type GroupCloseStreamFuture<'a> =
57 Pin<Box<dyn Future<Output = Result<CloseStreamResponse, GroupEngineError>> + Send + 'a>>;
58pub type GroupDeleteStreamFuture<'a> =
59 Pin<Box<dyn Future<Output = Result<DeleteStreamResponse, GroupEngineError>> + Send + 'a>>;
60pub type GroupForkRefFuture<'a> =
61 Pin<Box<dyn Future<Output = Result<ForkRefResponse, GroupEngineError>> + Send + 'a>>;
62pub type GroupSnapshotFuture<'a> =
63 Pin<Box<dyn Future<Output = Result<GroupSnapshot, GroupEngineError>> + Send + 'a>>;
64pub type GroupInstallSnapshotFuture<'a> =
65 Pin<Box<dyn Future<Output = Result<(), GroupEngineError>> + Send + 'a>>;
66pub type GroupWriteBatchFuture<'a> = Pin<
67 Box<
68 dyn Future<
69 Output = Result<
70 Vec<Result<GroupWriteResponse, GroupEngineError>>,
71 GroupEngineError,
72 >,
73 > + Send
74 + 'a,
75 >,
76>;
77pub type GroupEngineCreateFuture<'a> =
78 Pin<Box<dyn Future<Output = Result<Box<dyn GroupEngine>, GroupEngineError>> + Send + 'a>>;
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct GroupAppendBatchResponse {
82 pub placement: ShardPlacement,
83 pub items: Vec<Result<AppendResponse, GroupEngineError>>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87pub enum GroupWriteResponse {
88 CreateStream(CreateStreamResponse),
89 Append(AppendResponse),
90 AppendBatch(GroupAppendBatchResponse),
91 PublishSnapshot(PublishSnapshotResponse),
92 TouchStreamAccess(TouchStreamAccessResponse),
93 AddForkRef(ForkRefResponse),
94 ReleaseForkRef(ForkRefResponse),
95 FlushCold(FlushColdResponse),
96 CloseStream(CloseStreamResponse),
97 DeleteStream(DeleteStreamResponse),
98 Batch(Vec<Result<GroupWriteResponse, GroupEngineError>>),
99}
100
101pub trait GroupEngine: Send + 'static {
102 fn accepts_local_writes(&self) -> bool {
103 true
104 }
105
106 fn create_stream<'a>(
107 &'a mut self,
108 request: CreateStreamRequest,
109 placement: ShardPlacement,
110 ) -> GroupCreateStreamFuture<'a>;
111
112 fn create_stream_external<'a>(
113 &'a mut self,
114 request: CreateStreamExternalRequest,
115 _placement: ShardPlacement,
116 ) -> GroupCreateStreamFuture<'a> {
117 Box::pin(async move {
118 Err(GroupEngineError::new(format!(
119 "external stream create is not supported for stream '{}'",
120 request.stream_id
121 )))
122 })
123 }
124
125 fn head_stream<'a>(
126 &'a mut self,
127 request: HeadStreamRequest,
128 placement: ShardPlacement,
129 ) -> GroupHeadStreamFuture<'a>;
130
131 fn read_stream<'a>(
132 &'a mut self,
133 request: ReadStreamRequest,
134 placement: ShardPlacement,
135 ) -> GroupReadStreamFuture<'a>;
136
137 fn read_stream_parts<'a>(
138 &'a mut self,
139 request: ReadStreamRequest,
140 placement: ShardPlacement,
141 ) -> GroupReadStreamPartsFuture<'a> {
142 Box::pin(async move {
143 let response = self.read_stream(request, placement).await?;
144 Ok(GroupReadStreamParts::from_response(response))
145 })
146 }
147
148 fn require_local_live_read_owner<'a>(
149 &'a mut self,
150 _placement: ShardPlacement,
151 ) -> GroupRequireLiveReadOwnerFuture<'a> {
152 Box::pin(async { Ok(()) })
153 }
154
155 fn publish_snapshot<'a>(
156 &'a mut self,
157 request: PublishSnapshotRequest,
158 _placement: ShardPlacement,
159 ) -> GroupPublishSnapshotFuture<'a> {
160 Box::pin(async move {
161 Err(GroupEngineError::new(format!(
162 "snapshot publish is not supported for stream '{}'",
163 request.stream_id
164 )))
165 })
166 }
167
168 fn read_snapshot<'a>(
169 &'a mut self,
170 request: ReadSnapshotRequest,
171 _placement: ShardPlacement,
172 ) -> GroupReadSnapshotFuture<'a> {
173 Box::pin(async move {
174 Err(GroupEngineError::new(format!(
175 "snapshot read is not supported for stream '{}'",
176 request.stream_id
177 )))
178 })
179 }
180
181 fn delete_snapshot<'a>(
182 &'a mut self,
183 request: DeleteSnapshotRequest,
184 _placement: ShardPlacement,
185 ) -> GroupDeleteSnapshotFuture<'a> {
186 Box::pin(async move {
187 Err(GroupEngineError::new(format!(
188 "snapshot delete is not supported for stream '{}'",
189 request.stream_id
190 )))
191 })
192 }
193
194 fn bootstrap_stream<'a>(
195 &'a mut self,
196 request: BootstrapStreamRequest,
197 _placement: ShardPlacement,
198 ) -> GroupBootstrapStreamFuture<'a> {
199 Box::pin(async move {
200 Err(GroupEngineError::new(format!(
201 "bootstrap is not supported for stream '{}'",
202 request.stream_id
203 )))
204 })
205 }
206
207 fn touch_stream_access<'a>(
208 &'a mut self,
209 stream_id: BucketStreamId,
210 now_ms: u64,
211 renew_ttl: bool,
212 placement: ShardPlacement,
213 ) -> GroupTouchStreamAccessFuture<'a>;
214
215 fn add_fork_ref<'a>(
216 &'a mut self,
217 stream_id: BucketStreamId,
218 now_ms: u64,
219 placement: ShardPlacement,
220 ) -> GroupForkRefFuture<'a>;
221
222 fn release_fork_ref<'a>(
223 &'a mut self,
224 stream_id: BucketStreamId,
225 placement: ShardPlacement,
226 ) -> GroupForkRefFuture<'a>;
227
228 fn close_stream<'a>(
229 &'a mut self,
230 request: CloseStreamRequest,
231 placement: ShardPlacement,
232 ) -> GroupCloseStreamFuture<'a>;
233
234 fn delete_stream<'a>(
235 &'a mut self,
236 request: DeleteStreamRequest,
237 placement: ShardPlacement,
238 ) -> GroupDeleteStreamFuture<'a>;
239
240 fn append<'a>(
241 &'a mut self,
242 request: AppendRequest,
243 placement: ShardPlacement,
244 ) -> GroupAppendFuture<'a>;
245
246 fn append_external<'a>(
247 &'a mut self,
248 request: AppendExternalRequest,
249 _placement: ShardPlacement,
250 ) -> GroupAppendFuture<'a> {
251 Box::pin(async move {
252 Err(GroupEngineError::new(format!(
253 "external append is not supported for stream '{}'",
254 request.stream_id
255 )))
256 })
257 }
258
259 fn append_batch<'a>(
260 &'a mut self,
261 request: AppendBatchRequest,
262 placement: ShardPlacement,
263 ) -> GroupAppendBatchFuture<'a>;
264
265 fn create_stream_with_cold_admission<'a>(
266 &'a mut self,
267 request: CreateStreamRequest,
268 placement: ShardPlacement,
269 _admission: ColdWriteAdmission,
270 ) -> GroupCreateStreamFuture<'a> {
271 self.create_stream(request, placement)
272 }
273
274 fn append_with_cold_admission<'a>(
275 &'a mut self,
276 request: AppendRequest,
277 placement: ShardPlacement,
278 _admission: ColdWriteAdmission,
279 ) -> GroupAppendFuture<'a> {
280 self.append(request, placement)
281 }
282
283 fn append_batch_with_cold_admission<'a>(
284 &'a mut self,
285 request: AppendBatchRequest,
286 placement: ShardPlacement,
287 _admission: ColdWriteAdmission,
288 ) -> GroupAppendBatchFuture<'a> {
289 self.append_batch(request, placement)
290 }
291
292 fn append_batch_many_with_cold_admission<'a>(
293 &'a mut self,
294 requests: Vec<AppendBatchRequest>,
295 placement: ShardPlacement,
296 admission: ColdWriteAdmission,
297 ) -> GroupWriteBatchFuture<'a> {
298 Box::pin(async move {
299 let mut responses = Vec::with_capacity(requests.len());
300 for request in requests {
301 let response = self
302 .append_batch_with_cold_admission(request, placement, admission)
303 .await
304 .map(GroupWriteResponse::AppendBatch);
305 responses.push(response);
306 }
307 Ok(responses)
308 })
309 }
310
311 fn flush_cold<'a>(
312 &'a mut self,
313 request: FlushColdRequest,
314 _placement: ShardPlacement,
315 ) -> GroupFlushColdFuture<'a> {
316 Box::pin(async move {
317 Err(GroupEngineError::new(format!(
318 "cold flush is not supported for stream '{}'",
319 request.stream_id
320 )))
321 })
322 }
323
324 fn plan_cold_flush<'a>(
325 &'a mut self,
326 request: PlanColdFlushRequest,
327 _placement: ShardPlacement,
328 ) -> GroupPlanColdFlushFuture<'a> {
329 Box::pin(async move {
330 Err(GroupEngineError::new(format!(
331 "cold flush planning is not supported for stream '{}'",
332 request.stream_id
333 )))
334 })
335 }
336
337 fn plan_next_cold_flush<'a>(
338 &'a mut self,
339 _request: PlanGroupColdFlushRequest,
340 _placement: ShardPlacement,
341 ) -> GroupPlanNextColdFlushFuture<'a> {
342 Box::pin(async move {
343 Err(GroupEngineError::new(
344 "group cold flush planning is not supported",
345 ))
346 })
347 }
348
349 fn plan_next_cold_flush_batch<'a>(
350 &'a mut self,
351 request: PlanGroupColdFlushRequest,
352 placement: ShardPlacement,
353 max_candidates: usize,
354 ) -> GroupPlanNextColdFlushBatchFuture<'a> {
355 Box::pin(async move {
356 match self.plan_next_cold_flush(request, placement).await? {
357 Some(candidate) if max_candidates > 0 => Ok(vec![candidate]),
358 _ => Ok(Vec::new()),
359 }
360 })
361 }
362
363 fn cold_hot_backlog<'a>(
364 &'a mut self,
365 stream_id: BucketStreamId,
366 _placement: ShardPlacement,
367 ) -> GroupColdHotBacklogFuture<'a> {
368 Box::pin(async move {
369 Err(GroupEngineError::new(format!(
370 "cold hot backlog is not supported for stream '{stream_id}'"
371 )))
372 })
373 }
374
375 fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a>;
376
377 fn install_snapshot<'a>(
378 &'a mut self,
379 snapshot: GroupSnapshot,
380 ) -> GroupInstallSnapshotFuture<'a>;
381
382 fn write_batch<'a>(
383 &'a mut self,
384 commands: Vec<GroupWriteCommand>,
385 placement: ShardPlacement,
386 ) -> GroupWriteBatchFuture<'a> {
387 Box::pin(async move {
388 let mut responses = Vec::with_capacity(commands.len());
389 for command in commands {
390 let response = match command {
391 GroupWriteCommand::CreateStream {
392 stream_id,
393 content_type,
394 initial_payload,
395 close_after,
396 stream_seq,
397 producer,
398 stream_ttl_seconds,
399 stream_expires_at_ms,
400 forked_from,
401 fork_offset,
402 now_ms,
403 } => self
404 .create_stream(
405 CreateStreamRequest {
406 stream_id,
407 content_type,
408 content_type_explicit: true,
409 initial_payload,
410 close_after,
411 stream_seq,
412 producer,
413 stream_ttl_seconds,
414 stream_expires_at_ms,
415 forked_from,
416 fork_offset,
417 now_ms,
418 },
419 placement,
420 )
421 .await
422 .map(GroupWriteResponse::CreateStream),
423 GroupWriteCommand::CreateExternal {
424 stream_id,
425 content_type,
426 initial_payload,
427 close_after,
428 stream_seq,
429 producer,
430 stream_ttl_seconds,
431 stream_expires_at_ms,
432 forked_from,
433 fork_offset,
434 now_ms,
435 } => self
436 .create_stream_external(
437 CreateStreamExternalRequest {
438 stream_id,
439 content_type,
440 initial_payload,
441 close_after,
442 stream_seq,
443 producer,
444 stream_ttl_seconds,
445 stream_expires_at_ms,
446 forked_from,
447 fork_offset,
448 now_ms,
449 },
450 placement,
451 )
452 .await
453 .map(GroupWriteResponse::CreateStream),
454 GroupWriteCommand::Append {
455 stream_id,
456 content_type,
457 payload,
458 close_after,
459 stream_seq,
460 producer,
461 now_ms,
462 } => self
463 .append(
464 AppendRequest {
465 stream_id,
466 content_type,
467 payload,
468 close_after,
469 stream_seq,
470 producer,
471 now_ms,
472 },
473 placement,
474 )
475 .await
476 .map(GroupWriteResponse::Append),
477 GroupWriteCommand::AppendExternal {
478 stream_id,
479 content_type,
480 payload,
481 close_after,
482 stream_seq,
483 producer,
484 now_ms,
485 } => self
486 .append_external(
487 AppendExternalRequest {
488 stream_id,
489 content_type,
490 payload,
491 close_after,
492 stream_seq,
493 producer,
494 now_ms,
495 },
496 placement,
497 )
498 .await
499 .map(GroupWriteResponse::Append),
500 GroupWriteCommand::AppendBatch {
501 stream_id,
502 content_type,
503 payloads,
504 producer,
505 now_ms,
506 } => self
507 .append_batch(
508 AppendBatchRequest {
509 stream_id,
510 content_type,
511 payloads,
512 producer,
513 now_ms,
514 },
515 placement,
516 )
517 .await
518 .map(GroupWriteResponse::AppendBatch),
519 GroupWriteCommand::PublishSnapshot {
520 stream_id,
521 snapshot_offset,
522 content_type,
523 payload,
524 now_ms,
525 } => self
526 .publish_snapshot(
527 PublishSnapshotRequest {
528 stream_id,
529 snapshot_offset,
530 content_type,
531 payload,
532 now_ms,
533 },
534 placement,
535 )
536 .await
537 .map(GroupWriteResponse::PublishSnapshot),
538 GroupWriteCommand::TouchStreamAccess {
539 stream_id,
540 now_ms,
541 renew_ttl,
542 } => self
543 .touch_stream_access(stream_id, now_ms, renew_ttl, placement)
544 .await
545 .map(GroupWriteResponse::TouchStreamAccess),
546 GroupWriteCommand::AddForkRef { stream_id, now_ms } => self
547 .add_fork_ref(stream_id, now_ms, placement)
548 .await
549 .map(GroupWriteResponse::AddForkRef),
550 GroupWriteCommand::ReleaseForkRef { stream_id } => self
551 .release_fork_ref(stream_id, placement)
552 .await
553 .map(GroupWriteResponse::ReleaseForkRef),
554 GroupWriteCommand::FlushCold { stream_id, chunk } => self
555 .flush_cold(FlushColdRequest { stream_id, chunk }, placement)
556 .await
557 .map(GroupWriteResponse::FlushCold),
558 GroupWriteCommand::CloseStream {
559 stream_id,
560 stream_seq,
561 producer,
562 now_ms,
563 } => self
564 .close_stream(
565 CloseStreamRequest {
566 stream_id,
567 stream_seq,
568 producer,
569 now_ms,
570 },
571 placement,
572 )
573 .await
574 .map(GroupWriteResponse::CloseStream),
575 GroupWriteCommand::DeleteStream { stream_id } => self
576 .delete_stream(DeleteStreamRequest { stream_id }, placement)
577 .await
578 .map(GroupWriteResponse::DeleteStream),
579 GroupWriteCommand::Batch { commands } => self
580 .write_batch(commands, placement)
581 .await
582 .map(GroupWriteResponse::Batch),
583 };
584 responses.push(response);
585 }
586 Ok(responses)
587 })
588 }
589}
590
591pub trait GroupEngineFactory: Send + Sync + 'static {
592 fn create<'a>(
593 &'a self,
594 placement: ShardPlacement,
595 metrics: GroupEngineMetrics,
596 ) -> GroupEngineCreateFuture<'a>;
597}
598
599#[derive(Debug, Clone)]
600pub struct GroupEngineMetrics {
601 pub(crate) inner: Arc<RuntimeMetricsInner>,
602}
603
604impl GroupEngineMetrics {
605 pub fn record_wal_batch(
606 &self,
607 placement: ShardPlacement,
608 record_count: usize,
609 write_ns: u64,
610 sync_ns: u64,
611 ) {
612 self.inner.record_wal_batch(
613 placement.core_id,
614 placement.raft_group_id,
615 u64::try_from(record_count).expect("record count fits u64"),
616 write_ns,
617 sync_ns,
618 );
619 }
620
621 pub fn record_raft_write_many(
622 &self,
623 placement: ShardPlacement,
624 command_count: usize,
625 logical_command_count: usize,
626 response_count: usize,
627 submit_ns: u64,
628 response_ns: u64,
629 ) {
630 self.inner.record_raft_write_many(
631 placement.core_id,
632 placement.raft_group_id,
633 RaftWriteManySample {
634 command_count: u64::try_from(command_count).expect("command count fits u64"),
635 logical_command_count: u64::try_from(logical_command_count)
636 .expect("logical command count fits u64"),
637 response_count: u64::try_from(response_count).expect("response count fits u64"),
638 submit_ns,
639 response_ns,
640 },
641 );
642 }
643
644 pub fn record_raft_apply_batch(
645 &self,
646 placement: ShardPlacement,
647 entry_count: usize,
648 apply_ns: u64,
649 ) {
650 self.inner.record_raft_apply_batch(
651 placement.core_id,
652 placement.raft_group_id,
653 u64::try_from(entry_count).expect("entry count fits u64"),
654 apply_ns,
655 );
656 }
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
660pub struct GroupLeaderHint {
661 pub node_id: Option<u64>,
662 pub address: Option<String>,
663}
664
665#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
666pub struct GroupEngineError {
667 message: String,
668 code: Option<StreamErrorCode>,
669 next_offset: Option<u64>,
670 #[serde(default, skip_serializing_if = "Option::is_none")]
671 leader_hint: Option<GroupLeaderHint>,
672}
673
674impl GroupEngineError {
675 pub fn new(message: impl Into<String>) -> Self {
676 Self {
677 message: message.into(),
678 code: None,
679 next_offset: None,
680 leader_hint: None,
681 }
682 }
683
684 pub fn stream(code: StreamErrorCode, message: impl Into<String>) -> Self {
685 Self::stream_with_next_offset(code, message, None)
686 }
687
688 pub fn stream_with_next_offset(
689 code: StreamErrorCode,
690 message: impl Into<String>,
691 next_offset: Option<u64>,
692 ) -> Self {
693 Self {
694 message: format!("{code:?}: {}", message.into()),
695 code: Some(code),
696 next_offset,
697 leader_hint: None,
698 }
699 }
700
701 pub fn forward_to_leader(
702 message: impl Into<String>,
703 node_id: Option<u64>,
704 address: Option<String>,
705 ) -> Self {
706 Self {
707 message: message.into(),
708 code: None,
709 next_offset: None,
710 leader_hint: Some(GroupLeaderHint { node_id, address }),
711 }
712 }
713
714 pub fn from_replicated_parts(
715 message: impl Into<String>,
716 code: Option<StreamErrorCode>,
717 next_offset: Option<u64>,
718 leader_hint: Option<GroupLeaderHint>,
719 ) -> Self {
720 Self {
721 message: message.into(),
722 code,
723 next_offset,
724 leader_hint,
725 }
726 }
727
728 pub fn message(&self) -> &str {
729 &self.message
730 }
731
732 pub fn code(&self) -> Option<StreamErrorCode> {
733 self.code
734 }
735
736 pub fn next_offset(&self) -> Option<u64> {
737 self.next_offset
738 }
739
740 pub fn leader_hint(&self) -> Option<&GroupLeaderHint> {
741 self.leader_hint.as_ref()
742 }
743}
744
745impl std::fmt::Display for GroupEngineError {
746 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747 f.write_str(&self.message)
748 }
749}
750
751impl std::error::Error for GroupEngineError {}